Skip to content

Commit

Permalink
Added an engine package
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 9, 2023
1 parent 98e1642 commit 74ae963
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 294 deletions.
77 changes: 0 additions & 77 deletions bootstrap/bootstrap_test.go

This file was deleted.

34 changes: 34 additions & 0 deletions cli/banner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cli

import (
"fmt"

"github.com/fatih/color"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/conf"
)

func displayBanner() {
mode := conf.StringDefault("cli.banner.mode", "console")
if mode == "off" {
return
}
banner := color.WhiteString(fmt.Sprintf(`
_______ _______ ______ ___ _
| || || _ | | | | |
|_ _|| _ || | || | |_| |
| | | | | || |_||_ | _|
| | | |_| || __ || |_
| | | || | | || _ |
|___| |_______||___| |_||___| |_|
%s (%s)
`, tork.Version, tork.GitCommit))

if mode == "console" {
fmt.Println(banner)
} else {
log.Info().Msg(banner)
}
}
133 changes: 27 additions & 106 deletions cli/cli.go
Original file line number Diff line number Diff line change
@@ -1,129 +1,50 @@
package cli

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"

"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/bootstrap"
"github.com/runabol/tork/conf"
"github.com/runabol/tork/engine"
"github.com/runabol/tork/internal/logging"
ucli "github.com/urfave/cli/v2"
)

func Run() error {
app := &ucli.App{
Name: "tork",
Usage: "a distributed workflow engine",
Before: before,
Commands: commands(),
}
return app.Run(os.Args)
}

func before(ctx *ucli.Context) error {
displayBanner()
return nil
type CLI struct {
app *ucli.App
customizers []func(eng *engine.Engine) error
}

func commands() []*ucli.Command {
return []*ucli.Command{
runCmd(),
migrationCmd(),
healthCmd(),
}
}

func runCmd() *ucli.Command {
return &ucli.Command{
Name: "run",
Usage: "Run Tork",
UsageText: "tork run mode (standalone|coordinator|worker)",
Action: func(ctx *ucli.Context) error {
mode := ctx.Args().First()
if mode == "" {
if err := ucli.ShowSubcommandHelp(ctx); err != nil {
return err
}
fmt.Println("missing required argument: mode")
os.Exit(1)

}
return bootstrap.Start(bootstrap.Mode(ctx.Args().First()))
},
func New() *CLI {
app := &ucli.App{
Name: "tork",
Usage: "a distributed workflow engine",
}
c := &CLI{app: app}
app.Before = c.before
app.Commands = c.commands()
return c
}

func migrationCmd() *ucli.Command {
return &ucli.Command{
Name: "migration",
Usage: "Run the db migration script",
Action: func(ctx *ucli.Context) error {
return bootstrap.Start(bootstrap.ModeMigration)
},
}
func (c *CLI) CustomizeEngine(cust func(eng *engine.Engine) error) {
c.customizers = append(c.customizers, cust)
}

func healthCmd() *ucli.Command {
return &ucli.Command{
Name: "health",
Usage: "Perform a health check",
Action: health,
}
func (c *CLI) Run() error {
return c.app.Run(os.Args)
}

func displayBanner() {
mode := conf.StringDefault("cli.banner.mode", "console")
if mode == "off" {
return
}
banner := color.WhiteString(fmt.Sprintf(`
_______ _______ ______ ___ _
| || || _ | | | | |
|_ _|| _ || | || | |_| |
| | | | | || |_||_ | _|
| | | |_| || __ || |_
| | | || | | || _ |
|___| |_______||___| |_||___| |_|
%s (%s)
`, tork.Version, tork.GitCommit))

if mode == "console" {
fmt.Println(banner)
} else {
log.Info().Msg(banner)
}
}
func (c *CLI) before(ctx *ucli.Context) error {
displayBanner()

func health(_ *ucli.Context) error {
chk, err := http.Get(fmt.Sprintf("%s/health", conf.StringDefault("endpoint", "http://localhost:8000")))
if err != nil {
if err := logging.SetupLogging(); err != nil {
return err
}
if chk.StatusCode != http.StatusOK {
return errors.Errorf("Health check failed. Status Code: %d", chk.StatusCode)
}
body, err := io.ReadAll(chk.Body)
if err != nil {
return errors.Wrapf(err, "error reading body")
}

type resp struct {
Status string `json:"status"`
}
r := resp{}
return nil
}

if err := json.Unmarshal(body, &r); err != nil {
return errors.Wrapf(err, "error unmarshalling body")
func (c *CLI) commands() []*ucli.Command {
return []*ucli.Command{
c.runCmd(),
c.migrationCmd(),
c.healthCmd(),
}

fmt.Printf("Status: %s\n", r.Status)

return nil
}
47 changes: 47 additions & 0 deletions cli/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cli

import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/pkg/errors"
"github.com/runabol/tork/conf"
ucli "github.com/urfave/cli/v2"
)

func (c *CLI) healthCmd() *ucli.Command {
return &ucli.Command{
Name: "health",
Usage: "Perform a health check",
Action: health,
}
}

func health(_ *ucli.Context) error {
chk, err := http.Get(fmt.Sprintf("%s/health", conf.StringDefault("endpoint", "http://localhost:8000")))
if err != nil {
return err
}
if chk.StatusCode != http.StatusOK {
return errors.Errorf("Health check failed. Status Code: %d", chk.StatusCode)
}
body, err := io.ReadAll(chk.Body)
if err != nil {
return errors.Wrapf(err, "error reading body")
}

type resp struct {
Status string `json:"status"`
}
r := resp{}

if err := json.Unmarshal(body, &r); err != nil {
return errors.Wrapf(err, "error unmarshalling body")
}

fmt.Printf("Status: %s\n", r.Status)

return nil
}
40 changes: 40 additions & 0 deletions cli/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cli

import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork/conf"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/db/postgres"
ucli "github.com/urfave/cli/v2"
)

func (c *CLI) migrationCmd() *ucli.Command {
return &ucli.Command{
Name: "migration",
Usage: "Run the db migration script",
Action: migration,
}
}

func migration(ctx *ucli.Context) error {
dstype := conf.StringDefault("datastore.type", datastore.DATASTORE_INMEMORY)
switch dstype {
case datastore.DATASTORE_POSTGRES:
dsn := conf.StringDefault(
"datastore.postgres.dsn",
"host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable",
)
pg, err := datastore.NewPostgresDataStore(dsn)
if err != nil {
return err
}
if err := pg.ExecScript(postgres.SCHEMA); err != nil {
return errors.Wrapf(err, "error when trying to create db schema")
}
default:
return errors.Errorf("can't perform db migration on: %s", dstype)
}
log.Info().Msg("migration completed!")
return nil
}
36 changes: 36 additions & 0 deletions cli/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cli

import (
"fmt"
"os"

"github.com/runabol/tork/engine"
ucli "github.com/urfave/cli/v2"
)

func (c *CLI) runCmd() *ucli.Command {
return &ucli.Command{
Name: "run",
Usage: "Run Tork",
UsageText: "tork run mode (standalone|coordinator|worker)",
Action: c.run,
}
}
func (c *CLI) run(ctx *ucli.Context) error {
mode := ctx.Args().First()
if mode == "" {
if err := ucli.ShowSubcommandHelp(ctx); err != nil {
return err
}
fmt.Println("missing required argument: mode")
os.Exit(1)

}
eng := engine.New(engine.Mode(ctx.Args().First()))
for _, cust := range c.customizers {
if err := cust(eng); err != nil {
return err
}
}
return eng.Start()
}
Loading

0 comments on commit 74ae963

Please sign in to comment.