Skip to content

Commit

Permalink
use nodeId:- to ignore graph fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac committed Mar 17, 2023
1 parent 0f741cf commit d886f04
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# v0.NEXT
* High-level API nodes do not need to implement `Instancer` if you define a `nodeId` tag in the
config struct that defines them.
* An `InstaceID == '-'` or `nodeId:"-"` will ignore this field from the graph.
* Graph configs does not need to implement `ConnectedConfig` interface if their properties define the
`sendsTo` configuration.
* Graph builder returns error if nodes remain unconnected
Expand Down
3 changes: 3 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Graph API

* Graph node providers now can include an error in the signature
* Propagate context between providers (e.g. after the initialization of a first node, you can later
* Detect cycles (optional)
* Allow passing per-stage and per-instance options (e.b. buffer size for each concrete stage)
* Register: error if registering an existing configuration type. Suggest e.g using typedefs for same underlying type
Expand Down
5 changes: 3 additions & 2 deletions pkg/graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

const (
nodeIdTag = "nodeId"
sendsToTag = "sendsTo"
nodeIdTag = "nodeId"
sendsToTag = "sendsTo"
nodeIdIgnore = "-"
)

type codecKey struct {
Expand Down
64 changes: 64 additions & 0 deletions pkg/graph/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,67 @@ func TestCodecs(t *testing.T) {
assert.Equal(t, []string{"2", "4", "6"}, arr)

}

func TestIgnore(t *testing.T) {
b := NewBuilder()

type CounterCfg struct {
From int
To int
}
RegisterStart(b, func(cfg CounterCfg) node.StartFuncCtx[int] {
return func(_ context.Context, out chan<- int) {
for i := cfg.From; i <= cfg.To; i++ {
out <- i
}
}
})

type DoublerCfg struct{}
RegisterMiddle(b, func(_ DoublerCfg) node.MiddleFunc[int, int] {
return func(in <-chan int, out chan<- int) {
for n := range in {
out <- n * 2
}
}
})

type MapperCfg struct {
Dst map[int]struct{}
}
RegisterTerminal(b, func(cfg MapperCfg) node.TerminalFunc[int] {
return func(in <-chan int) {
for n := range in {
cfg.Dst[n] = struct{}{}
}
}
})

type config struct {
SomeExtraField int `nodeId:"-"` // this needs to be ignored
Start CounterCfg `nodeId:"n1" sendsTo:"n2"`
Middle DoublerCfg `nodeId:"n2" sendsTo:"n3"`
Term MapperCfg `nodeId:"n3"`
Connector
}
map1 := map[int]struct{}{}
g, err := b.Build(config{
Start: CounterCfg{From: 1, To: 5},
Middle: DoublerCfg{},
Term: MapperCfg{Dst: map1},
})
require.NoError(t, err)

done := make(chan struct{})
go func() {
g.Run(context.Background())
close(done)
}()
select {
case <-done:
case <-time.After(5 * time.Second):
require.Fail(t, "timeout while waiting for graph to complete")
}

assert.Equal(t, map[int]struct{}{2: {}, 4: {}, 6: {}, 8: {}, 10: {}}, map1)
}
13 changes: 8 additions & 5 deletions pkg/graph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,21 @@ func (b *Builder) applyField(fieldType reflect.StructField, fieldVal reflect.Val

if instancer, ok := fieldVal.Interface().(stage.Instancer); ok {
instanceID = instancer.ID()
} else if fieldVal.Type().ConvertibleTo(graphInstanceType) {
// if it does not implement the instancer interface, let's check if it can be converted
// to the convenience stage.Instance type
// TODO: if it implements it as a pointer but it is a value, try getting a pointer as we do later with Enabler
instanceID = fieldVal.Convert(graphInstanceType).Interface().(stage.Instance).ID()
//} else if fieldVal.Type().ConvertibleTo(graphInstanceType) {
// // if it does not implement the instancer interface, let's check if it can be converted
// // to the convenience stage.Instance type
// // TODO: if it implements it as a pointer but it is a value, try getting a pointer as we do later with Enabler
// instanceID = fieldVal.Convert(graphInstanceType).Interface().(stage.Instance).ID()
} else if instanceID, ok = fieldType.Tag.Lookup(nodeIdTag); !ok {
// Otherwise, let's check for the nodeId embedded tag in the struct if any
// But fail if it is not possible
return fmt.Errorf("field of type %s should provide an 'ID() InstanceID' method or be tagged"+
" with a `nodeId` tag in the configuration struct. Please provide a `nodeId` tag or e.g."+
" embed the stage.Instance field", fieldVal.Type())
}
if instanceID == nodeIdIgnore {
return nil
}

// checks if it has a sendsTo annotation and update the connections map accordingly
if dstNode, ok := fieldType.Tag.Lookup(sendsToTag); ok {
Expand Down

0 comments on commit d886f04

Please sign in to comment.