From 4f0b3cbe6f5f7a3e53d1a4b54567305452fa10ae Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Fri, 30 Mar 2018 23:21:54 +0200 Subject: [PATCH] Executors with GRPC --- Gopkg.lock | 8 +- builtin/bins/dkron-executor-shell/main.go | 18 ++ builtin/bins/dkron-executor-shell/shell.go | 19 ++ dkron/executor.go | 14 + main.go | 9 +- plugin/executor.go | 60 +++++ plugin/plugin.go | 1 + plugin/serve.go | 2 + plugins.go | 49 +++- proto/executor.pb.go | 299 +++++++++++++++++++++ proto/executor.proto | 41 +++ 11 files changed, 508 insertions(+), 12 deletions(-) create mode 100644 builtin/bins/dkron-executor-shell/main.go create mode 100644 builtin/bins/dkron-executor-shell/shell.go create mode 100644 dkron/executor.go create mode 100644 plugin/executor.go create mode 100644 proto/executor.pb.go create mode 100644 proto/executor.proto diff --git a/Gopkg.lock b/Gopkg.lock index 3f473cf60..2756b9ec0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -143,7 +143,7 @@ branch = "master" name = "github.com/hashicorp/go-plugin" packages = ["."] - revision = "3e6d191694b5a3a2b99755f31b47fa209e4bcd09" + revision = "baa83ead6ff956b3f99bcd609ae3499c028e019e" [[projects]] branch = "master" @@ -252,6 +252,12 @@ packages = ["."] revision = "06020f85339e21b2478f756a78e295255ffa4d6a" +[[projects]] + name = "github.com/oklog/run" + packages = ["."] + revision = "4dadeb3030eda0273a12382bb2348ffc7c9d1a39" + version = "v1.0.0" + [[projects]] name = "github.com/pelletier/go-toml" packages = ["."] diff --git a/builtin/bins/dkron-executor-shell/main.go b/builtin/bins/dkron-executor-shell/main.go new file mode 100644 index 000000000..94583f12e --- /dev/null +++ b/builtin/bins/dkron-executor-shell/main.go @@ -0,0 +1,18 @@ +package main + +import ( + "github.com/hashicorp/go-plugin" + dkplugin "github.com/victorcoder/dkron/plugin" +) + +func main() { + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: dkplugin.Handshake, + Plugins: map[string]plugin.Plugin{ + "executor": &dkplugin.ExecutorPlugin{Executor: &Shell{}}, + }, + + // A non-nil value here enables gRPC serving for this plugin... + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go new file mode 100644 index 000000000..449f9e70f --- /dev/null +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -0,0 +1,19 @@ +package main + +import ( + "errors" + + "github.com/victorcoder/dkron/dkron" +) + +// FilesOutput plugin that saves each execution log +// in it's own file in the file system. +type Shell struct { + Param1 string + Param2 bool +} + +// Process method of the plugin +func (s *Shell) Execute(args *dkron.ExecutorArgs) error { + return errors.New("Foo bar") +} diff --git a/dkron/executor.go b/dkron/executor.go new file mode 100644 index 000000000..4d453d899 --- /dev/null +++ b/dkron/executor.go @@ -0,0 +1,14 @@ +package dkron + +// KV is the interface that we're exposing as a plugin. +type Executor interface { + Execute(args *ExecutorArgs) error +} + +// Arguments for calling an execution processor +type ExecutorArgs struct { + // The execution to pass to the processor + Execution Execution + // The configuration for this plugin call + Config PluginConfig +} diff --git a/main.go b/main.go index 3d0b3a0ad..e1b3f8901 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "log" "os" "github.com/hashicorp/go-plugin" @@ -22,10 +23,16 @@ func main() { ui := &cli.BasicUi{Writer: os.Stdout} plugins := &Plugins{} - plugins.DiscoverPlugins() + if err := plugins.DiscoverPlugins(); err != nil { + log.Fatal(err) + } + + log.Println(plugins.Executors) // Make sure we clean up any managed plugins at the end of this + //protoc -I proto/ proto/executor.proto --go_out=plugins=grpc:dkron/ + c.Commands = map[string]cli.CommandFactory{ "agent": func() (cli.Command, error) { return &dkron.AgentCommand{ diff --git a/plugin/executor.go b/plugin/executor.go new file mode 100644 index 000000000..8122de664 --- /dev/null +++ b/plugin/executor.go @@ -0,0 +1,60 @@ +package plugin + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/hashicorp/go-plugin" + "github.com/victorcoder/dkron/dkron" + "github.com/victorcoder/dkron/proto" +) + +// This is the implementation of plugin.Plugin so we can serve/consume this. +// We also implement GRPCPlugin so that this plugin can be served over +// gRPC. +type ExecutorPlugin struct { + plugin.NetRPCUnsupportedPlugin + Executor dkron.Executor +} + +func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + proto.RegisterExecutorServer(s, ExecutorServer{Impl: p.Executor}) + return nil +} + +func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &ExecutorClient{client: proto.NewExecutorClient(c)}, nil +} + +// Here is the gRPC client that GRPCClient talks to. +type ExecutorClient struct { + // This is the real implementation + client proto.ExecutorClient +} + +func (m *ExecutorClient) Execute(args *proto.ExecutorArgs) error { + // This is where the magic conversion to Proto happens + a := &proto.ExecutorArgs{ + Execution: &proto.Execution{}, + } + _, err := m.client.Execute(context.Background(), &proto.ExecuteRequest{ + ExecutorArgs: a, + }) + return err +} + +// Here is the gRPC server that GRPCClient talks to. +type ExecutorServer struct { + // This is the real implementation + Impl dkron.Executor +} + +func (m ExecutorServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (*proto.ExecuteResponse, error) { + // This is where the magic conversion to native dkron happens + args := &dkron.ExecutorArgs{ + Execution: dkron.Execution{}, + } + err := m.Impl.Execute(args) + return &proto.ExecuteResponse{err.Error()}, err +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 804a78c6e..7680a30f2 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -9,4 +9,5 @@ import ( // PluginMap should be used by clients for the map of plugins. var PluginMap = map[string]plugin.Plugin{ "processor": &ExecutionProcessorPlugin{}, + "executor": &ExecutorPlugin{}, } diff --git a/plugin/serve.go b/plugin/serve.go index a560a7809..1a1cbd946 100644 --- a/plugin/serve.go +++ b/plugin/serve.go @@ -9,6 +9,7 @@ import ( // from the plugin server. const ( ProcessorPluginName = "processor" + ExecutorPluginName = "executor" ) // Handshake is the HandshakeConfig used to configure clients and servers. @@ -21,6 +22,7 @@ var Handshake = plugin.HandshakeConfig{ // ServeOpts are the configurations to serve a plugin. type ServeOpts struct { Processor dkron.ExecutionProcessor + Executor dkron.Executor } // Serve serves a plugin. This function never returns and should be the final diff --git a/plugins.go b/plugins.go index 68f897cdf..0ba669522 100644 --- a/plugins.go +++ b/plugins.go @@ -6,16 +6,17 @@ import ( "path/filepath" "strings" - "github.com/kardianos/osext" "github.com/Sirupsen/logrus" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" + "github.com/kardianos/osext" "github.com/victorcoder/dkron/dkron" dkplugin "github.com/victorcoder/dkron/plugin" ) type Plugins struct { Processors map[string]dkron.ExecutionProcessor + Executors map[string]dkron.Executor } // Discover plugins located on disk @@ -28,6 +29,7 @@ type Plugins struct { // Whichever file is discoverd LAST wins. func (p *Plugins) DiscoverPlugins() error { p.Processors = make(map[string]dkron.ExecutionProcessor) + p.Executors = make(map[string]dkron.Executor) // Look in /etc/dkron/plugins processors, err := plugin.Discover("dkron-processor-*", filepath.Join("/etc", "dkron", "plugins")) @@ -35,6 +37,12 @@ func (p *Plugins) DiscoverPlugins() error { return err } + // Look in /etc/dkron/plugins + executors, err := plugin.Discover("dkron-executor-*", filepath.Join("/etc", "dkron", "plugins")) + if err != nil { + return err + } + // Next, look in the same directory as the Dkron executable, usually // /usr/local/bin. If found, this replaces what we found in the config path. exePath, err := osext.Executable() @@ -42,31 +50,44 @@ func (p *Plugins) DiscoverPlugins() error { logrus.WithError(err).Error("Error loading exe directory") } else { processors, err = plugin.Discover("dkron-processor-*", filepath.Dir(exePath)) + executors, err = plugin.Discover("dkron-executor-*", filepath.Dir(exePath)) if err != nil { return err } } for _, file := range processors { - // If the filename has a ".", trim up to there - // if idx := strings.Index(file, "."); idx >= 0 { - // file = file[:idx] - // } + // Look for foo-bar-baz. The plugin name is "baz" + parts := strings.SplitN(file, "-", 3) + if len(parts) != 3 { + continue + } + raw, err := p.pluginFactory(file, dkplugin.ProcessorPluginName) + if err != nil { + return err + } + p.Processors[parts[2]] = raw.(dkron.ExecutionProcessor) + } + + for _, file := range executors { // Look for foo-bar-baz. The plugin name is "baz" parts := strings.SplitN(file, "-", 3) if len(parts) != 3 { continue } - processor, _ := p.processorFactory(file) - p.Processors[parts[2]] = processor + raw, err := p.pluginFactory(file, dkplugin.ExecutorPluginName) + if err != nil { + return err + } + p.Executors[parts[2]] = raw.(dkron.Executor) } return nil } -func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error) { +func (Plugins) pluginFactory(path string, pluginType string) (interface{}, error) { // Create an hclog.Logger logger := hclog.New(&hclog.LoggerOptions{ Name: "plugin", @@ -81,6 +102,14 @@ func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error config.Managed = true config.Plugins = dkplugin.PluginMap config.Logger = logger + + switch pluginType { + case dkplugin.ProcessorPluginName: + config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolNetRPC} + case dkplugin.ExecutorPluginName: + config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC} + } + client := plugin.NewClient(&config) // Request the RPC client so we can get the provider @@ -90,10 +119,10 @@ func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error return nil, err } - raw, err := rpcClient.Dispense(dkplugin.ProcessorPluginName) + raw, err := rpcClient.Dispense(pluginType) if err != nil { return nil, err } - return raw.(dkron.ExecutionProcessor), nil + return raw, nil } diff --git a/proto/executor.pb.go b/proto/executor.pb.go new file mode 100644 index 000000000..0716c4ea0 --- /dev/null +++ b/proto/executor.pb.go @@ -0,0 +1,299 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: executor.proto + +/* +Package dkron is a generated protocol buffer package. + +It is generated from these files: + executor.proto + +It has these top-level messages: + Execution + ExecutorArgs + PluginConfig + ExecuteRequest + ExecuteResponse +*/ +package proto + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/timestamp" +import google_protobuf1 "github.com/golang/protobuf/ptypes/any" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Execution struct { + // Name of the job this executions refers to. + JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"` + StartedAt *google_protobuf.Timestamp `protobuf:"bytes,2,opt,name=started_at,json=startedAt" json:"started_at,omitempty"` + FinishedAt *google_protobuf.Timestamp `protobuf:"bytes,3,opt,name=finished_at,json=finishedAt" json:"finished_at,omitempty"` + Success bool `protobuf:"varint,4,opt,name=success" json:"success,omitempty"` + Output []byte `protobuf:"bytes,5,opt,name=output,proto3" json:"output,omitempty"` + NodeName string `protobuf:"bytes,6,opt,name=node_name,json=nodeName" json:"node_name,omitempty"` + Group int64 `protobuf:"varint,7,opt,name=group" json:"group,omitempty"` + Attempt uint32 `protobuf:"varint,8,opt,name=attempt" json:"attempt,omitempty"` +} + +func (m *Execution) Reset() { *m = Execution{} } +func (m *Execution) String() string { return proto.CompactTextString(m) } +func (*Execution) ProtoMessage() {} +func (*Execution) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Execution) GetJobName() string { + if m != nil { + return m.JobName + } + return "" +} + +func (m *Execution) GetStartedAt() *google_protobuf.Timestamp { + if m != nil { + return m.StartedAt + } + return nil +} + +func (m *Execution) GetFinishedAt() *google_protobuf.Timestamp { + if m != nil { + return m.FinishedAt + } + return nil +} + +func (m *Execution) GetSuccess() bool { + if m != nil { + return m.Success + } + return false +} + +func (m *Execution) GetOutput() []byte { + if m != nil { + return m.Output + } + return nil +} + +func (m *Execution) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + +func (m *Execution) GetGroup() int64 { + if m != nil { + return m.Group + } + return 0 +} + +func (m *Execution) GetAttempt() uint32 { + if m != nil { + return m.Attempt + } + return 0 +} + +type ExecutorArgs struct { + Execution *Execution `protobuf:"bytes,1,opt,name=execution" json:"execution,omitempty"` + Config *PluginConfig `protobuf:"bytes,2,opt,name=config" json:"config,omitempty"` +} + +func (m *ExecutorArgs) Reset() { *m = ExecutorArgs{} } +func (m *ExecutorArgs) String() string { return proto.CompactTextString(m) } +func (*ExecutorArgs) ProtoMessage() {} +func (*ExecutorArgs) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *ExecutorArgs) GetExecution() *Execution { + if m != nil { + return m.Execution + } + return nil +} + +func (m *ExecutorArgs) GetConfig() *PluginConfig { + if m != nil { + return m.Config + } + return nil +} + +type PluginConfig struct { + Config map[string]*google_protobuf1.Any `protobuf:"bytes,1,rep,name=config" json:"config,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *PluginConfig) Reset() { *m = PluginConfig{} } +func (m *PluginConfig) String() string { return proto.CompactTextString(m) } +func (*PluginConfig) ProtoMessage() {} +func (*PluginConfig) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *PluginConfig) GetConfig() map[string]*google_protobuf1.Any { + if m != nil { + return m.Config + } + return nil +} + +type ExecuteRequest struct { + ExecutorArgs *ExecutorArgs `protobuf:"bytes,1,opt,name=executor_args,json=executorArgs" json:"executor_args,omitempty"` +} + +func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } +func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } +func (*ExecuteRequest) ProtoMessage() {} +func (*ExecuteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *ExecuteRequest) GetExecutorArgs() *ExecutorArgs { + if m != nil { + return m.ExecutorArgs + } + return nil +} + +type ExecuteResponse struct { + Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` +} + +func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } +func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } +func (*ExecuteResponse) ProtoMessage() {} +func (*ExecuteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *ExecuteResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func init() { + proto.RegisterType((*Execution)(nil), "dkron.Execution") + proto.RegisterType((*ExecutorArgs)(nil), "dkron.ExecutorArgs") + proto.RegisterType((*PluginConfig)(nil), "dkron.PluginConfig") + proto.RegisterType((*ExecuteRequest)(nil), "dkron.ExecuteRequest") + proto.RegisterType((*ExecuteResponse)(nil), "dkron.ExecuteResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Executor service + +type ExecutorClient interface { + Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) +} + +type executorClient struct { + cc *grpc.ClientConn +} + +func NewExecutorClient(cc *grpc.ClientConn) ExecutorClient { + return &executorClient{cc} +} + +func (c *executorClient) Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) { + out := new(ExecuteResponse) + err := grpc.Invoke(ctx, "/dkron.Executor/Execute", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Executor service + +type ExecutorServer interface { + Execute(context.Context, *ExecuteRequest) (*ExecuteResponse, error) +} + +func RegisterExecutorServer(s *grpc.Server, srv ExecutorServer) { + s.RegisterService(&_Executor_serviceDesc, srv) +} + +func _Executor_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecuteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExecutorServer).Execute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dkron.Executor/Execute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExecutorServer).Execute(ctx, req.(*ExecuteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Executor_serviceDesc = grpc.ServiceDesc{ + ServiceName: "dkron.Executor", + HandlerType: (*ExecutorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Execute", + Handler: _Executor_Execute_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "executor.proto", +} + +func init() { proto.RegisterFile("executor.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 453 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0x4d, 0x6f, 0xd3, 0x40, + 0x10, 0xd5, 0x36, 0x24, 0xb1, 0xc7, 0x69, 0xa9, 0xb6, 0xa5, 0x72, 0xcd, 0xa1, 0x96, 0x2f, 0x58, + 0x20, 0xb9, 0x52, 0x38, 0x10, 0xe0, 0x14, 0x41, 0x2e, 0x1c, 0x00, 0xad, 0xb8, 0x47, 0x4e, 0x32, + 0x31, 0x6e, 0xe2, 0x5d, 0xb3, 0xbb, 0x46, 0xe4, 0x9f, 0x70, 0xe1, 0xbf, 0xa2, 0xec, 0x47, 0x48, + 0x0b, 0x12, 0x27, 0xfb, 0xcd, 0xbc, 0x79, 0x33, 0xf3, 0x66, 0xe1, 0x0c, 0x7f, 0xe0, 0xb2, 0xd3, + 0x42, 0x16, 0xad, 0x14, 0x5a, 0xd0, 0xfe, 0x6a, 0x23, 0x05, 0x4f, 0x6e, 0x2a, 0x21, 0xaa, 0x2d, + 0xde, 0x9a, 0xe0, 0xa2, 0x5b, 0xdf, 0xea, 0xba, 0x41, 0xa5, 0xcb, 0xa6, 0xb5, 0xbc, 0xe4, 0xfa, + 0x21, 0xa1, 0xe4, 0x3b, 0x9b, 0xca, 0x7e, 0x9d, 0x40, 0x38, 0x33, 0xaa, 0xb5, 0xe0, 0xf4, 0x1a, + 0x82, 0x3b, 0xb1, 0x98, 0xf3, 0xb2, 0xc1, 0x98, 0xa4, 0x24, 0x0f, 0xd9, 0xf0, 0x4e, 0x2c, 0x3e, + 0x96, 0x0d, 0xd2, 0xd7, 0x00, 0x4a, 0x97, 0x52, 0xe3, 0x6a, 0x5e, 0xea, 0xf8, 0x24, 0x25, 0x79, + 0x34, 0x4e, 0x0a, 0x2b, 0x5c, 0x78, 0xe1, 0xe2, 0x8b, 0xef, 0xcc, 0x42, 0xc7, 0x9e, 0x6a, 0xfa, + 0x16, 0xa2, 0x75, 0xcd, 0x6b, 0xf5, 0xd5, 0xd6, 0xf6, 0xfe, 0x5b, 0x0b, 0x9e, 0x3e, 0xd5, 0x34, + 0x86, 0xa1, 0xea, 0x96, 0x4b, 0x54, 0x2a, 0x7e, 0x94, 0x92, 0x3c, 0x60, 0x1e, 0xd2, 0x2b, 0x18, + 0x88, 0x4e, 0xb7, 0x9d, 0x8e, 0xfb, 0x29, 0xc9, 0x47, 0xcc, 0x21, 0xfa, 0x14, 0x42, 0x2e, 0x56, + 0x68, 0xb7, 0x18, 0x98, 0x2d, 0x82, 0x7d, 0xc0, 0xac, 0x71, 0x09, 0xfd, 0x4a, 0x8a, 0xae, 0x8d, + 0x87, 0x29, 0xc9, 0x7b, 0xcc, 0x82, 0x7d, 0x93, 0x52, 0x6b, 0x6c, 0x5a, 0x1d, 0x07, 0x29, 0xc9, + 0x4f, 0x99, 0x87, 0xd9, 0x06, 0x46, 0x33, 0x67, 0xfa, 0x54, 0x56, 0x8a, 0x16, 0x10, 0xa2, 0xb7, + 0xcb, 0x58, 0x14, 0x8d, 0xcf, 0x0b, 0x73, 0x86, 0xe2, 0x60, 0x23, 0xfb, 0x43, 0xa1, 0x2f, 0x60, + 0xb0, 0x14, 0x7c, 0x5d, 0x57, 0xce, 0xb2, 0x0b, 0x47, 0xfe, 0xbc, 0xed, 0xaa, 0x9a, 0xbf, 0x33, + 0x29, 0xe6, 0x28, 0xd9, 0x4f, 0x02, 0xa3, 0xe3, 0x04, 0x7d, 0x75, 0xa8, 0x26, 0x69, 0x2f, 0x8f, + 0xc6, 0x37, 0xff, 0xa8, 0x2e, 0xec, 0x67, 0xc6, 0xb5, 0xdc, 0x79, 0xa5, 0xe4, 0x13, 0x44, 0x47, + 0x61, 0x7a, 0x0e, 0xbd, 0x0d, 0xee, 0xdc, 0x49, 0xf7, 0xbf, 0xf4, 0x39, 0xf4, 0xbf, 0x97, 0xdb, + 0x0e, 0xdd, 0x58, 0x97, 0x7f, 0x5d, 0x63, 0xca, 0x77, 0xcc, 0x52, 0xde, 0x9c, 0x4c, 0x48, 0xf6, + 0x01, 0xce, 0xec, 0x7e, 0xc8, 0xf0, 0x5b, 0x87, 0x4a, 0xd3, 0x09, 0x9c, 0xfa, 0xe7, 0x38, 0x2f, + 0x65, 0xa5, 0x9c, 0x1b, 0x17, 0xf7, 0xdc, 0xb0, 0xae, 0xb1, 0x11, 0x1e, 0xa1, 0xec, 0x19, 0x3c, + 0x3e, 0x68, 0xa9, 0x56, 0x70, 0x65, 0xce, 0x82, 0x52, 0x0a, 0xe9, 0x46, 0xb4, 0x60, 0xfc, 0x1e, + 0x02, 0x2f, 0x43, 0x27, 0x30, 0x74, 0x45, 0xf4, 0xc9, 0xbd, 0x16, 0x7e, 0xa0, 0xe4, 0xea, 0x61, + 0xd8, 0x6a, 0x2f, 0x06, 0x66, 0xa7, 0x97, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xe8, 0xfa, + 0x40, 0x3e, 0x03, 0x00, 0x00, +} diff --git a/proto/executor.proto b/proto/executor.proto new file mode 100644 index 000000000..91e2c35be --- /dev/null +++ b/proto/executor.proto @@ -0,0 +1,41 @@ +// protoc -I proto/ proto/executor.proto --go_out=plugins=grpc:dkron/ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/any.proto"; + +package dkron; + +message Execution { + // Name of the job this executions refers to. + string job_name = 1; + google.protobuf.Timestamp started_at = 2; + google.protobuf.Timestamp finished_at = 3; + bool success = 4; + bytes output = 5; + string node_name = 6; + int64 group = 7; + uint32 attempt = 8; +} + +message ExecutorArgs { + Execution execution = 1; + PluginConfig config = 2; +} + +message PluginConfig { + map config = 1; +} + +message ExecuteRequest { + ExecutorArgs executor_args = 1; +} + +message ExecuteResponse { + string error = 1; +} + +service Executor { + rpc Execute (ExecuteRequest) returns (ExecuteResponse); + rpc GetConfig() returns (PluginConfig); +}