Skip to content

Commit

Permalink
documented public API
Browse files Browse the repository at this point in the history
  • Loading branch information
Mario Macias committed Apr 9, 2022
1 parent 2067cd2 commit 0216098
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 27 deletions.
10 changes: 10 additions & 0 deletions examples/tutorial/01-lowlevel-nodes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Tutorial 01: basic nodes in Low-Level API

In this tutorial, we will create a basic graph using the Low-Level API:

```mermaid
graph TD
S1(Start 1) -->|Hello 1, ...| M("Middle<br/><small>(Forwards as<br/>UPPERCASE)")
S2(Start 2) -->|Hi 1, ...| M
M -->|Forwarding as UPPERCASE| T("Terminal<br/><small>(Prints)</small>")
```
50 changes: 34 additions & 16 deletions pkg/graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type inOutTyper interface {
outTyper
}

// Builder helps building a graph and connect their nodes. It takes care of instantiating all
// Builder helps to build a graph and to connect their nodes. It takes care of instantiating all
// its stages given a name and a type, as well as connect them. If two connected stages have
// incompatible types, it will insert a codec in between to translate between the stage types
type Builder struct {
Expand All @@ -42,6 +42,8 @@ type Builder struct {
options []reflect.Value
}

// NewBuilder instantiates a Graph Builder with the default configuration, which can be overridden via the
// arguments.
func NewBuilder(options ...node.Option) *Builder {
optVals := make([]reflect.Value, 0, len(options))
for _, opt := range options {
Expand All @@ -60,6 +62,11 @@ func NewBuilder(options ...node.Option) *Builder {
}
}

// RegisterCodec registers a Codec into the graph builder. A Codec is a node.MiddleFunc function
// that allows converting data types and it's automatically inserted when a node with a given
// output type is connected to a node with a different input type. When nodes with different
// types are connected, a codec converting between both MUST have been registered previously.
// Otherwise the graph Build method will fail.
func RegisterCodec[I, O any](nb *Builder, middleFunc node.MiddleFunc[I, O]) {
var in I
var out O
Expand All @@ -70,27 +77,53 @@ func RegisterCodec[I, O any](nb *Builder, middleFunc node.MiddleFunc[I, O]) {
}
}

// RegisterStart registers a stage.StartProvider into the graph builder. When the Build
// method is invoked later, any configuration field associated with the StartProvider will
// result in the instantiation of a node.Start with the provider's returned function.
func RegisterStart[CFG, O any](nb *Builder, b stage.StartProvider[CFG, O]) {
nb.startProviders[typeOf[CFG]()] = [2]reflect.Value{
reflect.ValueOf(node.AsStart[O]),
reflect.ValueOf(b),
}
}

// RegisterMiddle registers a stage.MiddleProvider into the graph builder. When the Build
// method is invoked later, any configuration field associated with the MiddleProvider will
// result in the instantiation of a node.Middle with the provider's returned function.
func RegisterMiddle[CFG, I, O any](nb *Builder, b stage.MiddleProvider[CFG, I, O]) {
nb.middleProviders[typeOf[CFG]()] = [2]reflect.Value{
reflect.ValueOf(node.AsMiddle[I, O]),
reflect.ValueOf(b),
}
}

// RegisterTerminal registers a stage.TerminalProvider into the graph builder. When the Build
// method is invoked later, any configuration field associated with the TerminalProvider will
// result in the instantiation of a node.Terminal with the provider's returned function.
func RegisterTerminal[CFG, I any](nb *Builder, b stage.TerminalProvider[CFG, I]) {
nb.terminalProviders[typeOf[CFG]()] = [2]reflect.Value{
reflect.ValueOf(node.AsTerminal[I]),
reflect.ValueOf(b),
}
}

// Build creates a Graph where each node corresponds to a field in the passed Configuration struct.
// The nodes will be connected according to the ConnectedConfig "source" --> ["destination"...] map.
func (b *Builder) Build(cfg ConnectedConfig) (Graph, error) {
g := Graph{}
if err := b.applyConfig(cfg); err != nil {
return g, err
}

for _, i := range b.ingests {
g.start = append(g.start, i.(startNode))
}
for _, e := range b.exports {
g.terms = append(g.terms, e.(terminalNode))
}
return g, nil
}

func instantiate(nb *Builder, instanceID string, arg reflect.Value) error {
// TODO: check if instanceID is duplicate
if instanceID == "" {
Expand Down Expand Up @@ -168,21 +201,6 @@ func (b *Builder) connect(src, dst string) error {
return nil
}

func (b *Builder) Build(cfg ConnectedConfig) (Graph, error) {
g := Graph{}
if err := b.applyConfig(cfg); err != nil {
return g, err
}

for _, i := range b.ingests {
g.start = append(g.start, i.(initNode))
}
for _, e := range b.exports {
g.terms = append(g.terms, e.(terminalNode))
}
return g, nil
}

// returns a node.Midle[?, ?] as a value
func (b *Builder) newCodec(inType, outType reflect.Type) (reflect.Value, bool) {
codec, ok := b.codecs[codecKey{In: inType, Out: outType}]
Expand Down
21 changes: 15 additions & 6 deletions pkg/graph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,28 @@ import (
"github.com/sirupsen/logrus"
)

// Connector key: ID of the source node. Value: array of destination node IDs.
var connectorType = reflect.TypeOf(Connector{})
var graphInstanceType = reflect.TypeOf(stage.Instance(""))

// Connector is a convenience implementor of the ConnectedConfig interface, required
// to build any graph. It can be embedded into any configuration struct that is passed
// as argument into the builder.Build method.
//
// Key: instance ID of the source node. Value: array of destination node instance IDs.
type Connector map[string][]string

// Connections returns the map holded by the Connector
// Connections returns the connection map represented by the Connector
func (c Connector) Connections() map[string][]string {
return c
}

var connectorType = reflect.TypeOf(Connector{})

// ConnectedConfig describes the interface that any struct passed to the builder.Build
// method must fullfill. Consider embedding the Connector type into your struct for
// automatic implementation of the interface.
type ConnectedConfig interface {
// Connections returns a map representing the connection of the node graphs, where
// the key contains the instance ID of the source node, and the value contains an
// array of the destination nodes' instance IDs.
Connections() map[string][]string
}

Expand Down Expand Up @@ -73,8 +84,6 @@ func (b *Builder) applyConfigReflect(cfgValue reflect.Value) error {
return nil
}

var graphInstanceType = reflect.TypeOf(stage.Instance(""))

func (b *Builder) applyField(field reflect.Value) error {
instancer, ok := field.Interface().(stage.Instancer)
if !ok {
Expand Down
7 changes: 5 additions & 2 deletions pkg/graph/graph.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package graph

type initNode interface {
type startNode interface {
Start()
}

type terminalNode interface {
Done() <-chan struct{}
}

// Graph is set of Start Nodes that generate information that is forwarded to
// Middle or Terminal nodes, which process that information. It must be created
// from the Builder type.
type Graph struct {
start []initNode
start []startNode
terms []terminalNode
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/graph/stage/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ type Instancer interface {

var _ Instancer = (*Instance)(nil)

// A provider wraps an instantiation function that, given a configuration argument, returns a
// node with a processing function.

// StartProvider is a function that, given a configuration argument of a unique type,
// returns a function fulfilling the node.StartFunc type signature. Returned functions
// will run inside a Graph Start Node
type StartProvider[CFG, O any] func(CFG) node.StartFunc[O]

// MiddleProvider is a function that, given a configuration argument of a unique type,
// returns a function fulfilling the node.MiddleFunc type signature. Returned functions
// will run inside a Graph Middle Node
type MiddleProvider[CFG, I, O any] func(CFG) node.MiddleFunc[I, O]

// TerminalProvider is a function that, given a configuration argument of a unique type,
// returns a function fulfilling the node.TerminalFunc type signature. Returned functions
// will run inside a Graph Terminal Node
type TerminalProvider[CFG, I any] func(CFG) node.TerminalFunc[I]

0 comments on commit 0216098

Please sign in to comment.