Skip to content

Commit

Permalink
[BEAM-8493] Add standard double coder to Go SDK.
Browse files Browse the repository at this point in the history
For upcoming features (in this case, SDF), we need to support the
standard double coder from beam_runner_api.proto. This commit adds
relevant support.
  • Loading branch information
youngoli authored and lostluck committed Oct 28, 2019
1 parent 8972e5e commit 501df3a
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 2 deletions.
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,17 @@ func inferCoder(t FullType) (*coder.Coder, error) {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.Float32, reflectx.Float64:

case reflectx.Float32:
c, err := coderx.NewFloat(t.Type())
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil

case reflectx.Float64:
return &coder.Coder{Kind: coder.Double, T: t}, nil

case reflectx.String:
c, err := coderx.NewString()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ const (
Bytes Kind = "bytes" // Implicitly length-prefixed as part of the encoding
Bool Kind = "bool"
VarInt Kind = "varint"
Double Kind = "double"
WindowedValue Kind = "W"
KV Kind = "KV"

Expand Down Expand Up @@ -256,6 +257,11 @@ func NewVarInt() *Coder {
return &Coder{Kind: VarInt, T: typex.New(reflectx.Int64)}
}

// NewDouble returns a new double coder using the built-in scheme.
func NewDouble() *Coder {
return &Coder{Kind: Double, T: typex.New(reflectx.Float64)}
}

// IsW returns true iff the coder is for a WindowedValue.
func IsW(c *Coder) bool {
return c.Kind == WindowedValue
Expand Down
41 changes: 41 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/double.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package coder

import (
"encoding/binary"
"io"
"math"

"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
)

// EncodeDouble encodes a float64 in big endian format.
func EncodeDouble(value float64, w io.Writer) error {
var data [8]byte
binary.BigEndian.PutUint64(data[:], math.Float64bits(value))
_, err := ioutilx.WriteUnsafe(w, data[:])
return err
}

// DecodeDouble decodes a float64 in big endian format.
func DecodeDouble(r io.Reader) (float64, error) {
var data [8]byte
if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil {
return 0, err
}
return math.Float64frombits(binary.BigEndian.Uint64(data[:])), nil
}
24 changes: 24 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
case coder.VarInt:
return &varIntEncoder{}

case coder.Double:
return &doubleEncoder{}

case coder.Custom:
return &customEncoder{
t: c.Custom.Type,
Expand Down Expand Up @@ -102,6 +105,9 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
case coder.VarInt:
return &varIntDecoder{}

case coder.Double:
return &doubleDecoder{}

case coder.Custom:
return &customDecoder{
t: c.Custom.Type,
Expand Down Expand Up @@ -204,6 +210,24 @@ func (*varIntDecoder) Decode(r io.Reader) (*FullValue, error) {
return &FullValue{Elm: n}, nil
}

type doubleEncoder struct{}

func (*doubleEncoder) Encode(val *FullValue, w io.Writer) error {
// Encoding: beam double (big-endian 64-bit IEEE 754 double)
return coder.EncodeDouble(val.Elm.(float64), w)
}

type doubleDecoder struct{}

func (*doubleDecoder) Decode(r io.Reader) (*FullValue, error) {
// Encoding: beam double (big-endian 64-bit IEEE 754 double)
f, err := coder.DecodeDouble(r)
if err != nil {
return nil, err
}
return &FullValue{Elm: f}, nil
}

type customEncoder struct {
t reflect.Type
enc Encoder
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TestCoders(t *testing.T) {
}, {
coder: coder.NewVarInt(),
val: &FullValue{Elm: int64(65)},
}, {
coder: coder.NewDouble(),
val: &FullValue{Elm: float64(12.9)},
}, {
coder: func() *coder.Coder {
c, _ := coderx.NewString()
Expand Down
7 changes: 7 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
urnBytesCoder = "beam:coder:bytes:v1"
urnBoolCoder = "beam:coder:bool:v1"
urnVarIntCoder = "beam:coder:varint:v1"
urnDoubleCoder = "beam:coder:double:v1"
urnLengthPrefixCoder = "beam:coder:length_prefix:v1"
urnKVCoder = "beam:coder:kv:v1"
urnIterableCoder = "beam:coder:iterable:v1"
Expand Down Expand Up @@ -162,6 +163,9 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
case urnVarIntCoder:
return coder.NewVarInt(), nil

case urnDoubleCoder:
return coder.NewDouble(), nil

case urnKVCoder:
if len(components) != 2 {
return nil, errors.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
Expand Down Expand Up @@ -377,6 +381,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
case coder.VarInt:
return b.internBuiltInCoder(urnVarIntCoder)

case coder.Double:
return b.internBuiltInCoder(urnDoubleCoder)

default:
panic(fmt.Sprintf("Failed to marshal custom coder %v, unexpected coder kind: %v", c, c.Kind))
}
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
"varint",
coder.NewVarInt(),
},
{
"double",
coder.NewDouble(),
},
{
"foo",
foo,
Expand Down
7 changes: 7 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
bytesType = "kind:bytes"
boolType = "kind:bool"
varIntType = "kind:varint"
doubleType = "kind:double"
streamType = "kind:stream"
pairType = "kind:pair"
lengthPrefixType = "kind:length_prefix"
Expand Down Expand Up @@ -154,6 +155,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
case coder.VarInt:
return &CoderRef{Type: varIntType}, nil

case coder.Double:
return &CoderRef{Type: doubleType}, nil

default:
return nil, errors.Errorf("bad coder kind: %v", c.Kind)
}
Expand Down Expand Up @@ -184,6 +188,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
case varIntType:
return coder.NewVarInt(), nil

case doubleType:
return coder.NewDouble(), nil

case pairType:
if len(c.Components) != 2 {
return nil, errors.Errorf("bad pair: %+v", c)
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestCreate(t *testing.T) {
}{
{[]interface{}{1, 2, 3}},
{[]interface{}{"1", "2", "3"}},
{[]interface{}{0.1, 0.2, 0.3}},
{[]interface{}{float32(0.1), float32(0.2), float32(0.3)}},
{[]interface{}{float64(0.1), float64(0.2), float64(0.3)}},
{[]interface{}{uint(1), uint(2), uint(3)}},
{[]interface{}{false, true, true, false, true}},
{[]interface{}{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}},
Expand Down

0 comments on commit 501df3a

Please sign in to comment.