Skip to content

Commit

Permalink
connectors tutorial and improved readme
Browse files Browse the repository at this point in the history
  • Loading branch information
Mario Macias committed Apr 9, 2022
1 parent 1010673 commit 2067cd2
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 66 deletions.
76 changes: 17 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,68 +34,26 @@ There are three types of nodes:
another node, but can process it and send the results to outside the graph
(e.g. memory, storage, web...)

## Example pipeline for the node API
With the low-level API, you can instantiate each node and connect it manually. It is simple and
efficient for Graphs whose structure is known at code time.

The following pipeline has two Start nodes that send the data to two destination Middle
nodes (`odds` and `evens`). From there, the data follows their own branches until they
are eventually joined in the `printer` Terminal node.

Check the complete examples in the [examples/](./examples) folder).

```go
func main() {
// Defining Start, middle and terminal nodes that wrap some functions
start1 := node.AsStart(StartCounter)
start2 := node.AsStart(StartRandoms)
odds := node.AsMiddle(OddFilter)
evens := node.AsMiddle(EvenFilter)
oddsMsg := node.AsMiddle(Messager("odd number"))
evensMsg := node.AsMiddle(Messager("even number"))
printer := node.AsTerminal(Printer)

// Connecting nodes like:
//
// start1----\ /---start2
// | X |
// evens<---/ \-->odds
// | |
// evensMsg oddsMsg
// \ /
// printer

start1.SendsTo(evens, odds)
start2.SendsTo(evens, odds)
odds.SendsTo(oddsMsg)
evens.SendsTo(evensMsg)
oddsMsg.SendsTo(printer)
evensMsg.SendsTo(printer)

// all the Start nodes must be started to
// start forwarding data to the rest of the graph
start1.Start()
start2.Start()
## Graph high-level API

// We can wait for terminal nodes to finish their execution
// after the rest of the graph has finished
<-printer.Done()
}
```
The High-Level API is aimed for graphs whose structure might be specified at runtime
(e.g. via a configuration file that specifies which stages are run and how they are connected).

Output:
This API allows registering Node Generators and Codecs:

```
even number: 2
odd number: 847
odd number: 59
odd number: 81
odd number: 81
even number: 0
odd number: 3
odd number: 1
odd number: 887
even number: 4
```
* A **Node Generator** is a function that, given a unique configuration type, returns a function
that can go inside a Start, Middle or Terminal Node (as explained in the previous section).
* A **Codec** is a middle function (this is, it's wrapped into a middle node and receives an input
readable channel and an output writable channel) where input and output belong to different types.
A codec will transform the input values to its equivalent in the output type. For example, it
could convert JSON strings to a Go map. Codecs allows wiring nodes with different output/input
types, and are automatically instantiated when needed.

## Graph high-level API
Given a configuration that contains all the Node configuration types as fields, and a connection map,
a graph builder will accordingly instantiate all the nodes and codecs (if necessary) and wire them.

TBD
For more illustrative examples, check the [graph-autopipe example](./examples/graph-autopipe) and
the [step-by-step tutorial](./examples/tutorial).
20 changes: 20 additions & 0 deletions examples/lowlevel-basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Example pipeline for the node API

The pipeline in the main file has two Start nodes that send the data to two destination Middle
nodes (`odds` and `evens`). From there, the data follows their own branches until they
are eventually joined in the `printer` Terminal node.

Output:

```
even number: 2
odd number: 847
odd number: 59
odd number: 81
odd number: 81
even number: 0
odd number: 3
odd number: 1
odd number: 887
even number: 4
```
11 changes: 11 additions & 0 deletions examples/lowlevel-basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ import (
"github.com/mariomac/pipes/pkg/node"
)

// StartCounter is a Start Node that generates some ordered numbers each 100 milliseconds
func StartCounter(out chan<- int) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
out <- i
}
}

// StartRandoms is a Start Node that generates some random numbers each 150 milliseconds
func StartRandoms(out chan<- int) {
for i := 0; i < 5; i++ {
time.Sleep(150 * time.Millisecond)
out <- rand.Intn(1000)
}
}

// OddFilter is a Middle Node that reads the numbers from the input channel and only forwards
// those that are Odd
func OddFilter(in <-chan int, out chan<- int) {
for n := range in {
if n%2 == 1 {
Expand All @@ -30,6 +34,8 @@ func OddFilter(in <-chan int, out chan<- int) {
}
}

// EvenFilter is a middle node that reads the numbers from the input channel and only
// forwards those that are Even
func EvenFilter(in <-chan int, out chan<- int) {
for n := range in {
if n%2 == 0 {
Expand All @@ -38,6 +44,8 @@ func EvenFilter(in <-chan int, out chan<- int) {
}
}

// Messager is a middle node that forwards each string received from the input channel,
// prepending the given message
func Messager(msg string) func(in <-chan int, out chan<- string) {
return func(in <-chan int, out chan<- string) {
for n := range in {
Expand All @@ -46,13 +54,15 @@ func Messager(msg string) func(in <-chan int, out chan<- string) {
}
}

// Printer is a Terminal Node that just prints each string received by its input channel.
func Printer(in <-chan string) {
for n := range in {
fmt.Println(n)
}
}

func main() {
// Instantiating the different node types
start1 := node.AsStart(StartCounter)
start2 := node.AsStart(StartRandoms)
odds := node.AsMiddle(OddFilter)
Expand All @@ -70,6 +80,7 @@ func main() {
\ /
printer
*/
// Manually wiring the nodes
start1.SendsTo(evens, odds)
start2.SendsTo(evens, odds)
odds.SendsTo(oddsMsg)
Expand Down
File renamed without changes.
80 changes: 80 additions & 0 deletions examples/tutorial/03-connectors/connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"fmt"
"math/rand"
"strconv"
"time"

"github.com/mariomac/pipes/pkg/graph"
"github.com/mariomac/pipes/pkg/graph/stage"
"github.com/mariomac/pipes/pkg/node"
)

type GeneratorConfig struct {
stage.Instance
Repeat int
Seed int64
LowerBound int
UpperBound int
}

type PrinterConfig stage.Instance

type Config struct {
graph.Connector
Generator GeneratorConfig
Printer PrinterConfig
}

func Generator(cfg GeneratorConfig) node.StartFunc[int] {
return func(out chan<- int) {
rand.Seed(cfg.Seed)
for n := 0; n < cfg.Repeat; n++ {
out <- cfg.LowerBound + rand.Intn(cfg.UpperBound-cfg.LowerBound)
}
}
}

func Printer(_ PrinterConfig) node.TerminalFunc[string] {
return func(in <-chan string) {
for i := range in {
fmt.Println("received: ", i)
}
}
}

// IntStringCodec just converts ints to string. Since the Generator
// creates integers and the printer only accepts strings, we must
// create and register a codec that will be automatically wired when
// needed
func IntStringCodec(in <-chan int, out chan<- string) {
for i := range in {
out <- strconv.Itoa(i)
}
}

func main() {
gb := graph.NewBuilder()
graph.RegisterCodec(gb, IntStringCodec)
graph.RegisterStart(gb, Generator)
graph.RegisterTerminal(gb, Printer)

grp, err := gb.Build(Config{
Generator: GeneratorConfig{
Instance: "generator",
LowerBound: -10,
UpperBound: 10,
Seed: time.Now().UnixNano(),
Repeat: 5,
},
Printer: "printer",
Connector: graph.Connector{
"generator": []string{"printer"},
},
})
if err != nil {
panic(err)
}
grp.Run()
}
16 changes: 9 additions & 7 deletions examples/tutorial/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Tutorial

TEMPTATIVE LIST:
01 - Low-Level API: basic nodes
02 - High-Level API: basic nodes
03 - High-Level API: connectors

1- basic lowlevel API
2- equivalent to 1 in highlevel API
3- connectors
4- buffered channels
5- using HCL as recommended config?
6- default connections/nodes without IDs? (still to be implemented)
TO DO:

04- buffered channels
05- using HCL as recommended config?
06- default connections/nodes without IDs? (still to be implemented)

0 comments on commit 2067cd2

Please sign in to comment.