Gopherate is a framework for building distributed systems using ZeroMQ. It provides a set of tools and abstractions to create producers, consumers, and brokers that communicate using messages.
- Producers and Consumers: Easily create producers and consumers that can send and receive messages.
- Broker: A broker that routes messages between producers and consumers.
- FSM (Finite State Machine): A flexible FSM implementation to manage the state of producers and consumers.
- Message Handling: Define and interpret messages with ease.
- Error and Log Channels: Centralized error and log handling.
As specified in ZMQ implementation this package depends on:
zmq4 is just a wrapper for the ZeroMQ library. It doesn't include the library itself.
Check the original 0MQ documentation and the libzmq repository.
On Ubuntu-like systems, run the following:
sudo apt install pkg-config libczmq-dev
You should now be able to check the installation:
$> pkg-config --modversion libzmq
4.3.4
$> apt list --installed | grep zmq
WARNING: apt does not have a stable CLI interface. Use with caution in scripts.
libczmq-dev/jammy,now 4.2.1-1 amd64 [installed]
libczmq4/jammy,now 4.2.1-1 amd64 [installed,automatic]
libzmq3-dev/jammy,now 4.3.4-2 amd64 [installed,automatic]
libzmq5/jammy,now 4.3.4-2 amd64 [installed,automatic]
To install Gopherate, use go get
:
go get github.com/Pigotz/gopherate
For a complete example, see the E2E tests implemented here.
In case you want to run the standalone Broker, find its implementation here.
ctx := context.Background()
broker, err := broker.NewBroker("tcp://*:5555", nil)
if err != nil {
panic(err)
}
defer broker.Close()
err = broker.Bind()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
broker.Run(ctx, nil, nil)
Define the Fibonacci task:
type ComputeFibonacciTask struct {
steps int
}
func (w *ComputeFibonacciTask) Function() string {
return "fibonacci"
}
func (w *ComputeFibonacciTask) Args() []string {
return []string{strconv.Itoa(w.steps)}
}
Launch the producer:
ctx := context.Background()
producer, err := producer.NewProducer("producer-ID", "tcp://localhost:5555", nil)
if err != nil {
panic(err)
}
defer producer.Close()
err = producer.Connect()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
producer.Run(ctx, nil, nil)
Submit the Fibonacci task to the network of consumers:
computeFibonacciTask := &ComputeFibonacciTask{
steps: 100,
}
results, err := producer.Process(ctx, computeFibonacciTask, 5*time.Second)
if err != nil {
panic(err)
}
// In this specific example, the result is a single string
fmt.Printf("Fibonacci result: %s\n", results[0])
The Fibonacci implementation:
func fibonacciHandler(args []string) ([]string, []error) {
if len(args) != 1 {
return nil, []error{errors.New("expected 1 argument")}
}
steps, err := strconv.Atoi(args[0])
if err != nil {
return nil, []error{err}
}
a, b := 0, 1
for i := 0; i < steps; i++ {
a, b = b, a+b
}
return []string{strconv.Itoa(a)}, nil
}
The main code:
ctx := context.Background()
consumer, err := consumer.NewConsumer("consumer-ID", "tcp://localhost:5555", nil, consumer.Handlers{
"fibonacci": fibonacciHandler,
})
if err != nil {
panic(err)
}
defer consumer.Close()
err = consumer.Connect()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
consumer.Run(ctx, nil, nil)
You can pass error and log channels to the broker, producer, and consumer. This way, you can centralize the error and log handling.
Print functions:
func PrintErrors(ctx context.Context, errors chan error) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case err := <-errors:
fmt.Printf("[%s] [ERROR] %s\n", time.Now().UTC().String(), err)
}
}
}
func PrintLogs(ctx context.Context, logs chan string) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case log := <-logs:
fmt.Printf("[%s] [LOG] %s\n", time.Now().UTC().String(), log)
}
}
}
Passing the channels:
var waitGroup sync.WaitGroup
// Buffer is necessary to prevent blocking
errors := make(chan error, 100)
defer close(errors)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
PrintErrors(ctx, errors)
}()
// Buffer is necessary to prevent blocking
logs := make(chan string, 100)
defer close(logs)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
PrintLogs(ctx, logs)
}()
// broker.Run(ctx, errors, logs)
// producer.Run(ctx, errors, logs)
// consumer.Run(ctx, errors, logs)
waitGroup.Wait()
You can prefix the errors and logs with the utility functions you fan find in the channels
package.
// Example usage
// `errors` and `logs` have already been defined before
var waitGroup sync.WaitGroup
brokerErrors := make(chan error, 100)
defer close(brokerErrors)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
channels.WrapErrorChannel(ctx, "[BROKER]", brokerErrors, errors)
}()
brokerLogs := make(chan string, 100)
defer close(brokerLogs)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
channels.PrefixStringChannel(ctx, "[BROKER]", brokerLogs, logs)
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
broker.Run(ctx, brokerErrors, brokerLogs)
}()
waitGroup.Wait()
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
To run the tests, use the following command:
go test ./...
- Pigotz - Pigotz
This project is licensed under the MIT License - see the LICENSE file for details.