Skip to content

Commit

Permalink
Executors with GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Mar 30, 2018
1 parent 4071e4c commit 4f0b3cb
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 12 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions builtin/bins/dkron-executor-shell/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"github.com/hashicorp/go-plugin"
dkplugin "github.com/victorcoder/dkron/plugin"
)

func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: dkplugin.Handshake,
Plugins: map[string]plugin.Plugin{
"executor": &dkplugin.ExecutorPlugin{Executor: &Shell{}},
},

// A non-nil value here enables gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
19 changes: 19 additions & 0 deletions builtin/bins/dkron-executor-shell/shell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"errors"

"github.com/victorcoder/dkron/dkron"
)

// FilesOutput plugin that saves each execution log
// in it's own file in the file system.
type Shell struct {
Param1 string
Param2 bool
}

// Process method of the plugin
func (s *Shell) Execute(args *dkron.ExecutorArgs) error {
return errors.New("Foo bar")
}
14 changes: 14 additions & 0 deletions dkron/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dkron

// KV is the interface that we're exposing as a plugin.
type Executor interface {
Execute(args *ExecutorArgs) error
}

// Arguments for calling an execution processor
type ExecutorArgs struct {
// The execution to pass to the processor
Execution Execution
// The configuration for this plugin call
Config PluginConfig
}
9 changes: 8 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main

import (
"fmt"
"log"
"os"

"github.com/hashicorp/go-plugin"
Expand All @@ -22,10 +23,16 @@ func main() {
ui := &cli.BasicUi{Writer: os.Stdout}

plugins := &Plugins{}
plugins.DiscoverPlugins()
if err := plugins.DiscoverPlugins(); err != nil {
log.Fatal(err)
}

log.Println(plugins.Executors)

// Make sure we clean up any managed plugins at the end of this

//protoc -I proto/ proto/executor.proto --go_out=plugins=grpc:dkron/

c.Commands = map[string]cli.CommandFactory{
"agent": func() (cli.Command, error) {
return &dkron.AgentCommand{
Expand Down
60 changes: 60 additions & 0 deletions plugin/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package plugin

import (
"context"

"google.golang.org/grpc"

"github.com/hashicorp/go-plugin"
"github.com/victorcoder/dkron/dkron"
"github.com/victorcoder/dkron/proto"
)

// This is the implementation of plugin.Plugin so we can serve/consume this.
// We also implement GRPCPlugin so that this plugin can be served over
// gRPC.
type ExecutorPlugin struct {
plugin.NetRPCUnsupportedPlugin
Executor dkron.Executor
}

func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterExecutorServer(s, ExecutorServer{Impl: p.Executor})
return nil
}

func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &ExecutorClient{client: proto.NewExecutorClient(c)}, nil
}

// Here is the gRPC client that GRPCClient talks to.
type ExecutorClient struct {
// This is the real implementation
client proto.ExecutorClient
}

func (m *ExecutorClient) Execute(args *proto.ExecutorArgs) error {
// This is where the magic conversion to Proto happens
a := &proto.ExecutorArgs{
Execution: &proto.Execution{},
}
_, err := m.client.Execute(context.Background(), &proto.ExecuteRequest{
ExecutorArgs: a,
})
return err
}

// Here is the gRPC server that GRPCClient talks to.
type ExecutorServer struct {
// This is the real implementation
Impl dkron.Executor
}

func (m ExecutorServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (*proto.ExecuteResponse, error) {
// This is where the magic conversion to native dkron happens
args := &dkron.ExecutorArgs{
Execution: dkron.Execution{},
}
err := m.Impl.Execute(args)
return &proto.ExecuteResponse{err.Error()}, err
}
1 change: 1 addition & 0 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ import (
// PluginMap should be used by clients for the map of plugins.
var PluginMap = map[string]plugin.Plugin{
"processor": &ExecutionProcessorPlugin{},
"executor": &ExecutorPlugin{},
}
2 changes: 2 additions & 0 deletions plugin/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// from the plugin server.
const (
ProcessorPluginName = "processor"
ExecutorPluginName = "executor"
)

// Handshake is the HandshakeConfig used to configure clients and servers.
Expand All @@ -21,6 +22,7 @@ var Handshake = plugin.HandshakeConfig{
// ServeOpts are the configurations to serve a plugin.
type ServeOpts struct {
Processor dkron.ExecutionProcessor
Executor dkron.Executor
}

// Serve serves a plugin. This function never returns and should be the final
Expand Down
49 changes: 39 additions & 10 deletions plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"path/filepath"
"strings"

"github.com/kardianos/osext"
"github.com/Sirupsen/logrus"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/kardianos/osext"
"github.com/victorcoder/dkron/dkron"
dkplugin "github.com/victorcoder/dkron/plugin"
)

type Plugins struct {
Processors map[string]dkron.ExecutionProcessor
Executors map[string]dkron.Executor
}

// Discover plugins located on disk
Expand All @@ -28,45 +29,65 @@ type Plugins struct {
// Whichever file is discoverd LAST wins.
func (p *Plugins) DiscoverPlugins() error {
p.Processors = make(map[string]dkron.ExecutionProcessor)
p.Executors = make(map[string]dkron.Executor)

// Look in /etc/dkron/plugins
processors, err := plugin.Discover("dkron-processor-*", filepath.Join("/etc", "dkron", "plugins"))
if err != nil {
return err
}

// Look in /etc/dkron/plugins
executors, err := plugin.Discover("dkron-executor-*", filepath.Join("/etc", "dkron", "plugins"))
if err != nil {
return err
}

// Next, look in the same directory as the Dkron executable, usually
// /usr/local/bin. If found, this replaces what we found in the config path.
exePath, err := osext.Executable()
if err != nil {
logrus.WithError(err).Error("Error loading exe directory")
} else {
processors, err = plugin.Discover("dkron-processor-*", filepath.Dir(exePath))
executors, err = plugin.Discover("dkron-executor-*", filepath.Dir(exePath))
if err != nil {
return err
}
}

for _, file := range processors {
// If the filename has a ".", trim up to there
// if idx := strings.Index(file, "."); idx >= 0 {
// file = file[:idx]
// }
// Look for foo-bar-baz. The plugin name is "baz"
parts := strings.SplitN(file, "-", 3)
if len(parts) != 3 {
continue
}

raw, err := p.pluginFactory(file, dkplugin.ProcessorPluginName)
if err != nil {
return err
}
p.Processors[parts[2]] = raw.(dkron.ExecutionProcessor)
}

for _, file := range executors {
// Look for foo-bar-baz. The plugin name is "baz"
parts := strings.SplitN(file, "-", 3)
if len(parts) != 3 {
continue
}

processor, _ := p.processorFactory(file)
p.Processors[parts[2]] = processor
raw, err := p.pluginFactory(file, dkplugin.ExecutorPluginName)
if err != nil {
return err
}
p.Executors[parts[2]] = raw.(dkron.Executor)
}

return nil
}

func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error) {
func (Plugins) pluginFactory(path string, pluginType string) (interface{}, error) {
// Create an hclog.Logger
logger := hclog.New(&hclog.LoggerOptions{
Name: "plugin",
Expand All @@ -81,6 +102,14 @@ func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error
config.Managed = true
config.Plugins = dkplugin.PluginMap
config.Logger = logger

switch pluginType {
case dkplugin.ProcessorPluginName:
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolNetRPC}
case dkplugin.ExecutorPluginName:
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
}

client := plugin.NewClient(&config)

// Request the RPC client so we can get the provider
Expand All @@ -90,10 +119,10 @@ func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error
return nil, err
}

raw, err := rpcClient.Dispense(dkplugin.ProcessorPluginName)
raw, err := rpcClient.Dispense(pluginType)
if err != nil {
return nil, err
}

return raw.(dkron.ExecutionProcessor), nil
return raw, nil
}
Loading

0 comments on commit 4f0b3cb

Please sign in to comment.