Skip to content

Commit

Permalink
Disallow EventTimes in iterators (apache#22435)
Browse files Browse the repository at this point in the history
* Disallow EventTimes in iterators

* Remove other ET registration case

* Remove unused ET funcs
  • Loading branch information
jrmccluskey authored Jul 25, 2022
1 parent fa492f8 commit b2b466a
Show file tree
Hide file tree
Showing 5 changed files with 2,466 additions and 13,619 deletions.
8 changes: 5 additions & 3 deletions sdks/go/pkg/beam/core/funcx/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
errIllegalParametersInIter = "All parameters in an iter must be universal type, container type, or concrete type"
errIllegalParametersInReIter = "Output of a reiter must be valid iter type"
errIllegalParametersInMultiMap = "Output of a multimap must be valid iter type"
errIllegalEventTimeInIter = "Iterators with timestamp values (<ET,V> and <ET, K, V>) are not valid, as side input time stamps are not preserved after windowing. See https://github.com/apache/beam/issues/22404 for more information."
)

// IsIter returns true iff the supplied type is a "single sweep functional iterator".
Expand Down Expand Up @@ -56,8 +57,10 @@ func IsMalformedIter(t reflect.Type) (bool, error) {
//
// func (*int) bool returns {int}
// func (*string, *int) bool returns {string, int}
// func (*typex.EventTime, *int) bool returns {typex.EventTime, int}
//
// EventTimes are not allowed in iterator types as per the Beam model
// (see https://github.com/apache/beam/issues/22404) for more
// information.
func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
types, ok, _ := unfoldIter(t)
return types, ok
Expand All @@ -78,8 +81,7 @@ func unfoldIter(t reflect.Type) ([]reflect.Type, bool, error) {
var ret []reflect.Type
skip := 0
if t.In(0).Kind() == reflect.Ptr && t.In(0).Elem() == typex.EventTimeType {
ret = append(ret, typex.EventTimeType)
skip = 1
return nil, false, errors.New(errIllegalEventTimeInIter)
}
if t.NumIn()-skip > 2 || t.NumIn() == skip {
return nil, false, nil
Expand Down
24 changes: 12 additions & 12 deletions sdks/go/pkg/beam/core/funcx/sideinput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func TestIsIter(t *testing.T) {
{func() bool { return false }, false}, // no value
{func(*int) int { return 0 }, false}, // no bool return
{func(int) bool { return false }, false}, // no ptr value
{func(*typex.EventTime) bool { return false }, false}, // no values
{func(*typex.EventTime) bool { return false }, false}, // EventTimes disallowed
{func(*int) bool { return false }, true},
{func(*typex.EventTime, *int) bool { return false }, true},
{func(*typex.EventTime, *int) bool { return false }, false}, // EventTimes disallowed
{func(*int, *string) bool { return false }, true},
{func(*typex.Y, *typex.Z) bool { return false }, true},
{func(*typex.EventTime, *int, *string) bool { return false }, true},
{func(*typex.EventTime, *int, *string) bool { return false }, false}, // EventTimes disallowed
{func(*int, *typex.Y, *typex.Z) bool { return false }, false}, // too many values
{func(*typex.EventTime, *int, *typex.Y, *typex.Z) bool { return false }, false}, // too many values
{func(*typex.EventTime, *int, *typex.Y, *typex.Z) bool { return false }, false}, // too many values, EventTimes disallowed
}

for _, test := range tests {
Expand All @@ -58,7 +58,7 @@ func TestIsReIter(t *testing.T) {
{func(*int) func(*int) bool { return nil }, false}, // takes parameters
{func(*int) (func(*int) bool, func(*int) bool) { return nil, nil }, false}, // too many iterators
{func() func(*int) bool { return nil }, true},
{func() func(*typex.EventTime, *int, *string) bool { return nil }, true},
{func() func(*typex.EventTime, *int, *string) bool { return nil }, false}, // EventTimes disallowed
}

for _, test := range tests {
Expand All @@ -75,13 +75,13 @@ func TestIsMultiMap(t *testing.T) {
Exp bool
}{
{func(int) func(*int) bool { return nil }, true},
{func() func(*int) bool { return nil }, false}, // Doesn't take an input (is a ReIter)
{func(*int) bool { return false }, false}, // Doesn't return an iterator (is an iterator)
{func(int) int { return 0 }, false}, // Doesn't return an iterator (returns a value)
{func(string) func(*int) int { return nil }, false}, // Returned iterator isn't a boolean return
{func(string) func(int) bool { return nil }, false}, // Returned iterator doesn't have a pointer receiver
{func(string) func(*typex.EventTime, *int) bool { return nil }, true},
{func(string) func(*typex.EventTime, *int) { return nil }, false}, // Returned iterator does not have a bool return
{func() func(*int) bool { return nil }, false}, // Doesn't take an input (is a ReIter)
{func(*int) bool { return false }, false}, // Doesn't return an iterator (is an iterator)
{func(int) int { return 0 }, false}, // Doesn't return an iterator (returns a value)
{func(string) func(*int) int { return nil }, false}, // Returned iterator isn't a boolean return
{func(string) func(int) bool { return nil }, false}, // Returned iterator doesn't have a pointer receiver
{func(string) func(*typex.EventTime, *int) bool { return nil }, false}, // EventTimes disallowed
{func(string) func(*typex.EventTime, *int) { return nil }, false}, // Returned iterator does not have a bool return, EventTimes disallowed
}
for _, test := range tests {
val := reflect.TypeOf(test.Fn)
Expand Down
2 changes: 0 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ func (v *iterValue) invoke(args []reflect.Value) []reflect.Value {
for i, t := range v.types {
var v reflect.Value
switch {
case t == typex.EventTimeType:
v = reflect.ValueOf(elm.Timestamp)
case isKey:
v = reflect.ValueOf(Convert(elm.Elm, t))
isKey = false
Expand Down
Loading

0 comments on commit b2b466a

Please sign in to comment.