Skip to content

Commit

Permalink
[apache#21391]Increase unit testing coverage in the exec package (apa…
Browse files Browse the repository at this point in the history
…che#24772)

* add more test in sdks/go/pkg/beam/core/runtime/exec/translate_test.go

* add test for translate.go unmarshalPort()

* add test on function TestNewUserStateAdapter

* add test for translate.go newBuilder(desc *fnpb.ProcessBundleDescriptor)

* add test for translate.go UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor)

* better naming and error message

* Update sdks/go/pkg/beam/core/runtime/exec/translate_test.go

Co-authored-by: Ritesh Ghorse <[email protected]>

* use errors.New() when format string is not used

Co-authored-by: v-zhenglinli <[email protected]>
Co-authored-by: Ritesh Ghorse <[email protected]>
  • Loading branch information
3 people authored Jan 5, 2023
1 parent a6e11d6 commit 1f34e8d
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 0 deletions.
196 changes: 196 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package exec

import (
"fmt"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -318,3 +320,197 @@ func makeWindowMappingFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
}
return wFn, nil
}

func TestInputIdToIndex(t *testing.T) {
tests := []struct {
in string
want int
}{
{ // does not start with i
"90",
0,
},
{ // start with i
"i0",
0,
},
{
"i1",
1,
},
{
"i10",
10,
},
}

for _, test := range tests {
got, err := inputIdToIndex(test.in)
if !strings.HasPrefix(test.in, "i") {
if err == nil {
t.Errorf("should return err when string does not has a prefix of i, but didn't. inputIdToIndex(%v) = (%v, %v)", test.in, got, err)
}
} else {
if got != test.want {
t.Errorf("can not correctly convert inputId to index. inputIdToIndex(%v) = (%v, %v), want %v", test.in, got, err, test.want)
}
}
}
}

func TestIndexToInputId(t *testing.T) {
tests := []struct {
in int
want string
}{
{
1,
"i1",
},
{
1000,
"i1000",
},
}

for _, test := range tests {
got := indexToInputId(test.in)
if got != test.want {
t.Errorf("can not correctly convert index to inputId. indexToInputId(%v) = (%v), want %v", test.in, got, test.want)
}
}
}

func TestUnmarshalPort(t *testing.T) {
var port fnpb.RemoteGrpcPort

tests := []struct {
inputData []byte
outputPort Port
outputStr string
outputError error
}{
{
inputData: []byte{},
outputPort: Port{URL: port.GetApiServiceDescriptor().GetUrl()},
outputStr: fnpb.RemoteGrpcPort{}.CoderId,
outputError: nil,
},
}

for _, test := range tests {
port, str, err := unmarshalPort(test.inputData)
if err != nil && test.outputError == nil {
t.Errorf("there is an error where should not be. unmarshalPort(%v) = (%v, %v, %v), want (%v, %v, %v)", test.inputData, port, str, err, test.outputPort, test.outputStr, test.outputError)
} else if err != nil && err != test.outputError {
t.Errorf("got an unexpected error: %v, want: %v", err, test.outputError)
} else if port != test.outputPort {
t.Errorf("the output port is not right. unmarshalPort(%v) = (%v, %v, %v), want (%v, %v, %v)", test.inputData, port, str, err, test.outputPort, test.outputStr, test.outputError)
} else if str != test.outputStr {
t.Errorf("the output string is not right. unmarshalPort(%v) = (%v, %v, %v), want (%v, %v, %v)", test.inputData, port, str, err, test.outputPort, test.outputStr, test.outputError)
}
}
}

func TestUnmarshalPlan(t *testing.T) {
transform := pipepb.PTransform{
Spec: &pipepb.FunctionSpec{
Urn: urnDataSource,
},
Outputs: map[string]string{},
}
tests := []struct {
name string
inputDesc *fnpb.ProcessBundleDescriptor
outputPlan *Plan
outputError error
}{
{
name: "test_no_root_units",
inputDesc: &fnpb.ProcessBundleDescriptor{
Id: "",
Transforms: map[string]*pipepb.PTransform{},
},
outputPlan: nil,
outputError: errors.New("no root units"),
},
{
name: "test_zero_transform",
inputDesc: &fnpb.ProcessBundleDescriptor{
Id: "",
Transforms: map[string]*pipepb.PTransform{
"": {},
},
},
outputPlan: nil,
outputError: errors.New("no root units"),
},
{
name: "test_transform_outputs_length_not_one",
inputDesc: &fnpb.ProcessBundleDescriptor{
Id: "",
Transforms: map[string]*pipepb.PTransform{
"": &transform,
},
},
outputPlan: nil,
outputError: errors.Errorf("expected one output from DataSource, got %v", transform.GetOutputs()),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
plan, err := UnmarshalPlan(test.inputDesc)
if err != nil && test.outputError == nil {
t.Errorf("there is an error where should not be. UnmarshalPlan(%v) = (%v, %v), want (%v, %v)", test.inputDesc, plan, err, test.outputPlan, test.outputError)
} else if err != nil && !reflect.DeepEqual(err, test.outputError) {
t.Errorf("got an unexpected error: %v, want: %v", err, test.outputError)
} else if !reflect.DeepEqual(plan, test.outputPlan) {
t.Errorf("the output builder is not right. UnmarshalPlan(%v) = (%v, %v), want (%v, %v)", test.inputDesc, plan, err, test.outputPlan, test.outputError)
}
})
}
}

func TestNewBuilder(t *testing.T) {
descriptor := fnpb.ProcessBundleDescriptor{
Id: "",
Transforms: map[string]*pipepb.PTransform{},
}
tests := []struct {
name string
inputDesc *fnpb.ProcessBundleDescriptor
outputBuilder *builder
outputError error
}{
{
name: "test_1",
inputDesc: &descriptor,
outputBuilder: &builder{
desc: &descriptor,
coders: graphx.NewCoderUnmarshaller(descriptor.GetCoders()),
prev: make(map[string]int),
succ: make(map[string][]linkID),
windowing: make(map[string]*window.WindowingStrategy),
nodes: make(map[string]*PCollection),
links: make(map[linkID]Node),
units: nil,
idgen: &GenID{},
},
outputError: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b, err := newBuilder(test.inputDesc)
if err != nil && test.outputError == nil {
t.Errorf("There is an error where should not be. newBuilder(%v) = (%v, %v), want (%v, %v)", test.inputDesc, b, err, test.outputBuilder, test.outputError)
} else if err != nil && err != test.outputError {
t.Errorf("got an unexpected error: %v, want: %v", err, test.outputError)
} else if !reflect.DeepEqual(b, test.outputBuilder) {
t.Errorf("The output builder is not right. newBuilder(%v) = (%v, %v), want (%v, %v)", test.inputDesc, b, err, test.outputBuilder, test.outputError)
}
})
}
}
65 changes: 65 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/userstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,68 @@ func (t *testIoWriter) Write(b []byte) (int, error) {
t.b = b
return len(b), nil
}

func TestNewUserStateAdapter(t *testing.T) {
testCoder := &coder.Coder{
Kind: coder.WindowedValue,
T: nil,
Components: []*coder.Coder{
{
Kind: coder.KV,
Components: []*coder.Coder{
{
Kind: coder.Double,
},
{
Kind: coder.Bool,
},
},
},
},
Custom: nil,
Window: &coder.WindowCoder{
Kind: coder.GlobalWindow,
Payload: "",
},
ID: "",
}
tests := []struct {
name string
sid StreamID
c *coder.Coder
stateIDToCoder map[string]*coder.Coder
stateIDToKeyCoder map[string]*coder.Coder
stateIDToCombineFn map[string]*graph.CombineFn
adapter UserStateAdapter
}{
{
name: "",
sid: StreamID{
Port: Port{},
PtransformID: "",
},
c: testCoder,
stateIDToCoder: nil,
stateIDToKeyCoder: nil,
stateIDToCombineFn: nil,
adapter: &userStateAdapter{
sid: StreamID{},
wc: &globalWindowEncoder{},
kc: MakeElementEncoder(coder.SkipW(testCoder).Components[0]),
stateIDToCoder: nil,
stateIDToKeyCoder: nil,
stateIDToCombineFn: nil,
c: testCoder,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
adapter := NewUserStateAdapter(test.sid, test.c, test.stateIDToCoder, test.stateIDToKeyCoder, test.stateIDToCombineFn)
if !reflect.DeepEqual(adapter, test.adapter) {
t.Errorf("NewUserStateAdapter(%v, %v, %v, %v, %v)=%v, want %v", test.sid, test.c, test.stateIDToCoder, test.stateIDToKeyCoder, test.stateIDToCombineFn, adapter, test.adapter)
}
})
}
}

0 comments on commit 1f34e8d

Please sign in to comment.