diff --git a/CHANGELOG.md b/CHANGELOG.md index e56bbac67..437dc4395 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [#1400](https://github.com/influxdata/kapacitor/issues/1400): Allow for `.yml` file extensions in `define-topic-handler` - [#1402](https://github.com/influxdata/kapacitor/pull/1402): Fix http server error logging. +- [#1500](https://github.com/influxdata/kapacitor/pull/1500): Fix bugs with stopping running UDF agent. ## v1.3.1 [2017-06-02] diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index b5c35372d..5a2816f74 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -137,7 +137,7 @@ func compareAlertData(exp, got alert.Data) (bool, string) { type UDFService struct { ListFunc func() []string InfoFunc func(name string) (udf.Info, bool) - CreateFunc func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) + CreateFunc func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) } func (u UDFService) List() []string { @@ -148,8 +148,8 @@ func (u UDFService) Info(name string) (udf.Info, bool) { return u.InfoFunc(name) } -func (u UDFService) Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) { - return u.CreateFunc(name, l, abortCallback) +func (u UDFService) Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) { + return u.CreateFunc(name, taskID, nodeID, l, abortCallback) } type taskStore struct{} diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 20d33a6e6..e9ec33ec0 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -5930,11 +5930,11 @@ stream return } uio := udf_test.NewIO() - udfService.CreateFunc = func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) { + udfService.CreateFunc = func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) { if name != "customFunc" { return nil, fmt.Errorf("unknown function %s", name) } - return udf_test.New(uio, l), nil + return udf_test.New(taskID, nodeID, uio, l), nil } tmInit := func(tm *kapacitor.TaskMaster) { @@ -5950,6 +5950,12 @@ stream t.Error("expected init message") } init := i.Init + if got, exp := init.TaskID, "TestStream_CustomFunctions"; got != exp { + t.Errorf("unexpected task ID got %q exp %q", got, exp) + } + if got, exp := init.NodeID, "customFunc4"; got != exp { + t.Errorf("unexpected task ID got %q exp %q", got, exp) + } if got, exp := len(init.Options), 2; got != exp { t.Fatalf("unexpected number of options in init request, got %d exp %d", got, exp) diff --git a/services/udf/service.go b/services/udf/service.go index 1545e8a74..777e5f2fb 100644 --- a/services/udf/service.go +++ b/services/udf/service.go @@ -59,7 +59,7 @@ func (s *Service) Info(name string) (udf.Info, bool) { } func (s *Service) Create( - name string, + name, taskID, nodeID string, l *log.Logger, abortCallback func(), ) (udf.Interface, error) { @@ -70,6 +70,7 @@ func (s *Service) Create( if conf.Socket != "" { // Create socket UDF return kapacitor.NewUDFSocket( + taskID, nodeID, kapacitor.NewSocketConn(conf.Socket), l, time.Duration(conf.Timeout), @@ -87,6 +88,7 @@ func (s *Service) Create( Env: env, } return kapacitor.NewUDFProcess( + taskID, nodeID, command.ExecCommander, cmdSpec, l, @@ -109,7 +111,10 @@ func (s *Service) Refresh(name string) error { } func (s *Service) loadUDFInfo(name string) (udf.Info, error) { - u, err := s.Create(name, s.logger, nil) + // loadUDFInfo creates a UDF connection outside the context of a task or node + // because it only makes the Info request and never makes an Init request. + // As such it does not need to provide actual task and node IDs. + u, err := s.Create(name, "", "", s.logger, nil) if err != nil { return udf.Info{}, err } diff --git a/task_master.go b/task_master.go index 7604344c2..302a5ab90 100644 --- a/task_master.go +++ b/task_master.go @@ -50,7 +50,7 @@ type LogService interface { type UDFService interface { List() []string Info(name string) (udf.Info, bool) - Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) + Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) } var ErrTaskMasterClosed = errors.New("TaskMaster is closed") diff --git a/udf.go b/udf.go index 41abf4e1e..1cf1063e7 100644 --- a/udf.go +++ b/udf.go @@ -40,6 +40,8 @@ func newUDFNode(et *ExecutingTask, n *pipeline.UDFNode, l *log.Logger) (*UDFNode // Create the UDF f, err := et.tm.UDFService.Create( n.UDFName, + et.Task.ID, + n.Name(), l, un.abortedCallback, ) @@ -143,6 +145,9 @@ func (n *UDFNode) snapshot() ([]byte, error) { // over STDIN and STDOUT. Lines received over STDERR are logged // via normal Kapacitor logging. type UDFProcess struct { + taskName string + nodeName string + server *udf.Server commander command.Commander cmdSpec command.Spec @@ -162,6 +167,7 @@ type UDFProcess struct { } func NewUDFProcess( + taskName, nodeName string, commander command.Commander, cmdSpec command.Spec, l *log.Logger, @@ -169,6 +175,8 @@ func NewUDFProcess( abortCallback func(), ) *UDFProcess { return &UDFProcess{ + taskName: taskName, + nodeName: nodeName, commander: commander, cmdSpec: cmdSpec, logger: l, @@ -208,6 +216,8 @@ func (p *UDFProcess) Open() error { outBuf := bufio.NewReader(stdout) p.server = udf.NewServer( + p.taskName, + p.nodeName, outBuf, stdin, p.logger, @@ -269,6 +279,9 @@ func (p *UDFProcess) Out() <-chan edge.Message { return p.server.Out() func (p *UDFProcess) Info() (udf.Info, error) { return p.server.Info() } type UDFSocket struct { + taskName string + nodeName string + server *udf.Server socket Socket @@ -285,12 +298,15 @@ type Socket interface { } func NewUDFSocket( + taskName, nodeName string, socket Socket, l *log.Logger, timeout time.Duration, abortCallback func(), ) *UDFSocket { return &UDFSocket{ + taskName: taskName, + nodeName: nodeName, socket: socket, logger: l, timeout: timeout, @@ -308,6 +324,8 @@ func (s *UDFSocket) Open() error { outBuf := bufio.NewReader(out) s.server = udf.NewServer( + s.taskName, + s.nodeName, outBuf, in, s.logger, diff --git a/udf/agent/py/kapacitor/udf/udf_pb2.py b/udf/agent/py/kapacitor/udf/udf_pb2.py index 669e9d748..809a5fab7 100644 --- a/udf/agent/py/kapacitor/udf/udf_pb2.py +++ b/udf/agent/py/kapacitor/udf/udf_pb2.py @@ -20,7 +20,7 @@ name='udf.proto', package='agent', syntax='proto3', - serialized_pb=_b('\n\tudf.proto\x12\x05\x61gent\"\r\n\x0bInfoRequest\"\xc7\x01\n\x0cInfoResponse\x12\x1e\n\x05wants\x18\x01 \x01(\x0e\x32\x0f.agent.EdgeType\x12!\n\x08provides\x18\x02 \x01(\x0e\x32\x0f.agent.EdgeType\x12\x31\n\x07options\x18\x03 \x03(\x0b\x32 .agent.InfoResponse.OptionsEntry\x1a\x41\n\x0cOptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12 \n\x05value\x18\x02 \x01(\x0b\x32\x11.agent.OptionInfo:\x02\x38\x01\"2\n\nOptionInfo\x12$\n\nvalueTypes\x18\x01 \x03(\x0e\x32\x10.agent.ValueType\"-\n\x0bInitRequest\x12\x1e\n\x07options\x18\x01 \x03(\x0b\x32\r.agent.Option\":\n\x06Option\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\"\n\x06values\x18\x02 \x03(\x0b\x32\x12.agent.OptionValue\"\xa6\x01\n\x0bOptionValue\x12\x1e\n\x04type\x18\x01 \x01(\x0e\x32\x10.agent.ValueType\x12\x13\n\tboolValue\x18\x02 \x01(\x08H\x00\x12\x12\n\x08intValue\x18\x03 \x01(\x03H\x00\x12\x15\n\x0b\x64oubleValue\x18\x04 \x01(\x01H\x00\x12\x15\n\x0bstringValue\x18\x05 \x01(\tH\x00\x12\x17\n\rdurationValue\x18\x06 \x01(\x03H\x00\x42\x07\n\x05value\".\n\x0cInitResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"\x11\n\x0fSnapshotRequest\"$\n\x10SnapshotResponse\x12\x10\n\x08snapshot\x18\x01 \x01(\x0c\"\"\n\x0eRestoreRequest\x12\x10\n\x08snapshot\x18\x01 \x01(\x0c\"1\n\x0fRestoreResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\r\n\x05\x65rror\x18\x02 \x01(\t\" \n\x10KeepaliveRequest\x12\x0c\n\x04time\x18\x01 \x01(\x03\"!\n\x11KeepaliveResponse\x12\x0c\n\x04time\x18\x01 \x01(\x03\"\x1e\n\rErrorResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t\"\x9f\x01\n\nBeginBatch\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05group\x18\x02 \x01(\t\x12)\n\x04tags\x18\x03 \x03(\x0b\x32\x1b.agent.BeginBatch.TagsEntry\x12\x0c\n\x04size\x18\x04 \x01(\x03\x12\x0e\n\x06\x62yName\x18\x05 \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x04\n\x05Point\x12\x0c\n\x04time\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x17\n\x0fretentionPolicy\x18\x04 \x01(\t\x12\r\n\x05group\x18\x05 \x01(\t\x12\x12\n\ndimensions\x18\x06 \x03(\t\x12$\n\x04tags\x18\x07 \x03(\x0b\x32\x16.agent.Point.TagsEntry\x12\x34\n\x0c\x66ieldsDouble\x18\x08 \x03(\x0b\x32\x1e.agent.Point.FieldsDoubleEntry\x12.\n\tfieldsInt\x18\t \x03(\x0b\x32\x1b.agent.Point.FieldsIntEntry\x12\x34\n\x0c\x66ieldsString\x18\n \x03(\x0b\x32\x1e.agent.Point.FieldsStringEntry\x12\x0e\n\x06\x62yName\x18\x0b \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldsDoubleEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\x1a\x30\n\x0e\x46ieldsIntEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\x1a\x33\n\x11\x46ieldsStringEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x9b\x01\n\x08\x45ndBatch\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05group\x18\x02 \x01(\t\x12\x0c\n\x04tmax\x18\x03 \x01(\x03\x12\'\n\x04tags\x18\x04 \x03(\x0b\x32\x19.agent.EndBatch.TagsEntry\x12\x0e\n\x06\x62yName\x18\x05 \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc3\x02\n\x07Request\x12\"\n\x04info\x18\x01 \x01(\x0b\x32\x12.agent.InfoRequestH\x00\x12\"\n\x04init\x18\x02 \x01(\x0b\x32\x12.agent.InitRequestH\x00\x12,\n\tkeepalive\x18\x03 \x01(\x0b\x32\x17.agent.KeepaliveRequestH\x00\x12*\n\x08snapshot\x18\x04 \x01(\x0b\x32\x16.agent.SnapshotRequestH\x00\x12(\n\x07restore\x18\x05 \x01(\x0b\x32\x15.agent.RestoreRequestH\x00\x12\"\n\x05\x62\x65gin\x18\x10 \x01(\x0b\x32\x11.agent.BeginBatchH\x00\x12\x1d\n\x05point\x18\x11 \x01(\x0b\x32\x0c.agent.PointH\x00\x12\x1e\n\x03\x65nd\x18\x12 \x01(\x0b\x32\x0f.agent.EndBatchH\x00\x42\t\n\x07message\"\xf0\x02\n\x08Response\x12#\n\x04info\x18\x01 \x01(\x0b\x32\x13.agent.InfoResponseH\x00\x12#\n\x04init\x18\x02 \x01(\x0b\x32\x13.agent.InitResponseH\x00\x12-\n\tkeepalive\x18\x03 \x01(\x0b\x32\x18.agent.KeepaliveResponseH\x00\x12+\n\x08snapshot\x18\x04 \x01(\x0b\x32\x17.agent.SnapshotResponseH\x00\x12)\n\x07restore\x18\x05 \x01(\x0b\x32\x16.agent.RestoreResponseH\x00\x12%\n\x05\x65rror\x18\x06 \x01(\x0b\x32\x14.agent.ErrorResponseH\x00\x12\"\n\x05\x62\x65gin\x18\x10 \x01(\x0b\x32\x11.agent.BeginBatchH\x00\x12\x1d\n\x05point\x18\x11 \x01(\x0b\x32\x0c.agent.PointH\x00\x12\x1e\n\x03\x65nd\x18\x12 \x01(\x0b\x32\x0f.agent.EndBatchH\x00\x42\t\n\x07message*!\n\x08\x45\x64geType\x12\n\n\x06STREAM\x10\x00\x12\t\n\x05\x42\x41TCH\x10\x01*D\n\tValueType\x12\x08\n\x04\x42OOL\x10\x00\x12\x07\n\x03INT\x10\x01\x12\n\n\x06\x44OUBLE\x10\x02\x12\n\n\x06STRING\x10\x03\x12\x0c\n\x08\x44URATION\x10\x04\x62\x06proto3') + serialized_pb=_b('\n\tudf.proto\x12\x05\x61gent\"\r\n\x0bInfoRequest\"\xc7\x01\n\x0cInfoResponse\x12\x1e\n\x05wants\x18\x01 \x01(\x0e\x32\x0f.agent.EdgeType\x12!\n\x08provides\x18\x02 \x01(\x0e\x32\x0f.agent.EdgeType\x12\x31\n\x07options\x18\x03 \x03(\x0b\x32 .agent.InfoResponse.OptionsEntry\x1a\x41\n\x0cOptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12 \n\x05value\x18\x02 \x01(\x0b\x32\x11.agent.OptionInfo:\x02\x38\x01\"2\n\nOptionInfo\x12$\n\nvalueTypes\x18\x01 \x03(\x0e\x32\x10.agent.ValueType\"M\n\x0bInitRequest\x12\x1e\n\x07options\x18\x01 \x03(\x0b\x32\r.agent.Option\x12\x0e\n\x06taskID\x18\x02 \x01(\t\x12\x0e\n\x06nodeID\x18\x03 \x01(\t\":\n\x06Option\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\"\n\x06values\x18\x02 \x03(\x0b\x32\x12.agent.OptionValue\"\xa6\x01\n\x0bOptionValue\x12\x1e\n\x04type\x18\x01 \x01(\x0e\x32\x10.agent.ValueType\x12\x13\n\tboolValue\x18\x02 \x01(\x08H\x00\x12\x12\n\x08intValue\x18\x03 \x01(\x03H\x00\x12\x15\n\x0b\x64oubleValue\x18\x04 \x01(\x01H\x00\x12\x15\n\x0bstringValue\x18\x05 \x01(\tH\x00\x12\x17\n\rdurationValue\x18\x06 \x01(\x03H\x00\x42\x07\n\x05value\".\n\x0cInitResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"\x11\n\x0fSnapshotRequest\"$\n\x10SnapshotResponse\x12\x10\n\x08snapshot\x18\x01 \x01(\x0c\"\"\n\x0eRestoreRequest\x12\x10\n\x08snapshot\x18\x01 \x01(\x0c\"1\n\x0fRestoreResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\r\n\x05\x65rror\x18\x02 \x01(\t\" \n\x10KeepaliveRequest\x12\x0c\n\x04time\x18\x01 \x01(\x03\"!\n\x11KeepaliveResponse\x12\x0c\n\x04time\x18\x01 \x01(\x03\"\x1e\n\rErrorResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t\"\x9f\x01\n\nBeginBatch\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05group\x18\x02 \x01(\t\x12)\n\x04tags\x18\x03 \x03(\x0b\x32\x1b.agent.BeginBatch.TagsEntry\x12\x0c\n\x04size\x18\x04 \x01(\x03\x12\x0e\n\x06\x62yName\x18\x05 \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x04\n\x05Point\x12\x0c\n\x04time\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x17\n\x0fretentionPolicy\x18\x04 \x01(\t\x12\r\n\x05group\x18\x05 \x01(\t\x12\x12\n\ndimensions\x18\x06 \x03(\t\x12$\n\x04tags\x18\x07 \x03(\x0b\x32\x16.agent.Point.TagsEntry\x12\x34\n\x0c\x66ieldsDouble\x18\x08 \x03(\x0b\x32\x1e.agent.Point.FieldsDoubleEntry\x12.\n\tfieldsInt\x18\t \x03(\x0b\x32\x1b.agent.Point.FieldsIntEntry\x12\x34\n\x0c\x66ieldsString\x18\n \x03(\x0b\x32\x1e.agent.Point.FieldsStringEntry\x12\x0e\n\x06\x62yName\x18\x0b \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldsDoubleEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01:\x02\x38\x01\x1a\x30\n\x0e\x46ieldsIntEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\x1a\x33\n\x11\x46ieldsStringEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x9b\x01\n\x08\x45ndBatch\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05group\x18\x02 \x01(\t\x12\x0c\n\x04tmax\x18\x03 \x01(\x03\x12\'\n\x04tags\x18\x04 \x03(\x0b\x32\x19.agent.EndBatch.TagsEntry\x12\x0e\n\x06\x62yName\x18\x05 \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc3\x02\n\x07Request\x12\"\n\x04info\x18\x01 \x01(\x0b\x32\x12.agent.InfoRequestH\x00\x12\"\n\x04init\x18\x02 \x01(\x0b\x32\x12.agent.InitRequestH\x00\x12,\n\tkeepalive\x18\x03 \x01(\x0b\x32\x17.agent.KeepaliveRequestH\x00\x12*\n\x08snapshot\x18\x04 \x01(\x0b\x32\x16.agent.SnapshotRequestH\x00\x12(\n\x07restore\x18\x05 \x01(\x0b\x32\x15.agent.RestoreRequestH\x00\x12\"\n\x05\x62\x65gin\x18\x10 \x01(\x0b\x32\x11.agent.BeginBatchH\x00\x12\x1d\n\x05point\x18\x11 \x01(\x0b\x32\x0c.agent.PointH\x00\x12\x1e\n\x03\x65nd\x18\x12 \x01(\x0b\x32\x0f.agent.EndBatchH\x00\x42\t\n\x07message\"\xf0\x02\n\x08Response\x12#\n\x04info\x18\x01 \x01(\x0b\x32\x13.agent.InfoResponseH\x00\x12#\n\x04init\x18\x02 \x01(\x0b\x32\x13.agent.InitResponseH\x00\x12-\n\tkeepalive\x18\x03 \x01(\x0b\x32\x18.agent.KeepaliveResponseH\x00\x12+\n\x08snapshot\x18\x04 \x01(\x0b\x32\x17.agent.SnapshotResponseH\x00\x12)\n\x07restore\x18\x05 \x01(\x0b\x32\x16.agent.RestoreResponseH\x00\x12%\n\x05\x65rror\x18\x06 \x01(\x0b\x32\x14.agent.ErrorResponseH\x00\x12\"\n\x05\x62\x65gin\x18\x10 \x01(\x0b\x32\x11.agent.BeginBatchH\x00\x12\x1d\n\x05point\x18\x11 \x01(\x0b\x32\x0c.agent.PointH\x00\x12\x1e\n\x03\x65nd\x18\x12 \x01(\x0b\x32\x0f.agent.EndBatchH\x00\x42\t\n\x07message*!\n\x08\x45\x64geType\x12\n\n\x06STREAM\x10\x00\x12\t\n\x05\x42\x41TCH\x10\x01*D\n\tValueType\x12\x08\n\x04\x42OOL\x10\x00\x12\x07\n\x03INT\x10\x01\x12\n\n\x06\x44OUBLE\x10\x02\x12\n\n\x06STRING\x10\x03\x12\x0c\n\x08\x44URATION\x10\x04\x62\x06proto3') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -41,8 +41,8 @@ ], containing_type=None, options=None, - serialized_start=2402, - serialized_end=2435, + serialized_start=2434, + serialized_end=2467, ) _sym_db.RegisterEnumDescriptor(_EDGETYPE) @@ -76,8 +76,8 @@ ], containing_type=None, options=None, - serialized_start=2437, - serialized_end=2505, + serialized_start=2469, + serialized_end=2537, ) _sym_db.RegisterEnumDescriptor(_VALUETYPE) @@ -243,6 +243,20 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), + _descriptor.FieldDescriptor( + name='taskID', full_name='agent.InitRequest.taskID', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='nodeID', full_name='agent.InitRequest.nodeID', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), ], extensions=[ ], @@ -256,7 +270,7 @@ oneofs=[ ], serialized_start=289, - serialized_end=334, + serialized_end=366, ) @@ -293,8 +307,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=336, - serialized_end=394, + serialized_start=368, + serialized_end=426, ) @@ -362,8 +376,8 @@ name='value', full_name='agent.OptionValue.value', index=0, containing_type=None, fields=[]), ], - serialized_start=397, - serialized_end=563, + serialized_start=429, + serialized_end=595, ) @@ -400,8 +414,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=565, - serialized_end=611, + serialized_start=597, + serialized_end=643, ) @@ -424,8 +438,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=613, - serialized_end=630, + serialized_start=645, + serialized_end=662, ) @@ -455,8 +469,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=632, - serialized_end=668, + serialized_start=664, + serialized_end=700, ) @@ -486,8 +500,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=670, - serialized_end=704, + serialized_start=702, + serialized_end=736, ) @@ -524,8 +538,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=706, - serialized_end=755, + serialized_start=738, + serialized_end=787, ) @@ -555,8 +569,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=757, - serialized_end=789, + serialized_start=789, + serialized_end=821, ) @@ -586,8 +600,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=791, - serialized_end=824, + serialized_start=823, + serialized_end=856, ) @@ -617,8 +631,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=826, - serialized_end=856, + serialized_start=858, + serialized_end=888, ) @@ -655,8 +669,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=975, - serialized_end=1018, + serialized_start=1007, + serialized_end=1050, ) _BEGINBATCH = _descriptor.Descriptor( @@ -713,8 +727,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=859, - serialized_end=1018, + serialized_start=891, + serialized_end=1050, ) @@ -751,8 +765,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=975, - serialized_end=1018, + serialized_start=1007, + serialized_end=1050, ) _POINT_FIELDSDOUBLEENTRY = _descriptor.Descriptor( @@ -788,8 +802,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1391, - serialized_end=1442, + serialized_start=1423, + serialized_end=1474, ) _POINT_FIELDSINTENTRY = _descriptor.Descriptor( @@ -825,8 +839,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1444, - serialized_end=1492, + serialized_start=1476, + serialized_end=1524, ) _POINT_FIELDSSTRINGENTRY = _descriptor.Descriptor( @@ -862,8 +876,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1494, - serialized_end=1545, + serialized_start=1526, + serialized_end=1577, ) _POINT = _descriptor.Descriptor( @@ -962,8 +976,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1021, - serialized_end=1545, + serialized_start=1053, + serialized_end=1577, ) @@ -1000,8 +1014,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=975, - serialized_end=1018, + serialized_start=1007, + serialized_end=1050, ) _ENDBATCH = _descriptor.Descriptor( @@ -1058,8 +1072,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1548, - serialized_end=1703, + serialized_start=1580, + serialized_end=1735, ) @@ -1141,8 +1155,8 @@ name='message', full_name='agent.Request.message', index=0, containing_type=None, fields=[]), ], - serialized_start=1706, - serialized_end=2029, + serialized_start=1738, + serialized_end=2061, ) @@ -1231,8 +1245,8 @@ name='message', full_name='agent.Response.message', index=0, containing_type=None, fields=[]), ], - serialized_start=2032, - serialized_end=2400, + serialized_start=2064, + serialized_end=2432, ) _INFORESPONSE_OPTIONSENTRY.fields_by_name['value'].message_type = _OPTIONINFO diff --git a/udf/agent/server.go b/udf/agent/server.go index 94cd6d334..634091998 100644 --- a/udf/agent/server.go +++ b/udf/agent/server.go @@ -4,7 +4,6 @@ import ( "net" "os" "os/signal" - "strings" "sync" ) @@ -14,11 +13,11 @@ type Server struct { listener net.Listener accepter Accepter - conns chan net.Conn - mu sync.Mutex stopped bool stopping chan struct{} + + wg sync.WaitGroup } type Accepter interface { @@ -32,59 +31,88 @@ func NewServer(l net.Listener, a Accepter) *Server { return &Server{ listener: l, accepter: a, - conns: make(chan net.Conn), stopping: make(chan struct{}), } } +// Server starts the server and blocks. func (s *Server) Serve() error { + s.mu.Lock() + if s.stopped { + s.mu.Unlock() + return nil + } + s.wg.Add(1) + s.mu.Unlock() + + defer s.wg.Done() return s.run() } +// Stop closes the listener and stops all server activity. func (s *Server) Stop() { s.mu.Lock() - defer s.mu.Unlock() if s.stopped { + s.mu.Unlock() return } s.stopped = true + s.listener.Close() + s.mu.Unlock() + close(s.stopping) + s.wg.Wait() } -// Register a signal handler to stop the Server for the given signals. +// StopOnSignals registers a signal handler to stop the Server for the given signals. func (s *Server) StopOnSignals(signals ...os.Signal) { + s.mu.Lock() + if s.stopped { + s.mu.Unlock() + return + } + s.wg.Add(1) + s.mu.Unlock() + c := make(chan os.Signal) signal.Notify(c, signals...) go func() { - for range c { + defer s.wg.Done() + select { + case <-s.stopping: + case <-c: s.Stop() } }() } func (s *Server) run() error { + conns := make(chan net.Conn) errC := make(chan error, 1) + s.wg.Add(1) go func() { + defer s.wg.Done() for { conn, err := s.listener.Accept() if err != nil { errC <- err } - s.conns <- conn + conns <- conn } }() for { select { case <-s.stopping: - s.listener.Close() return nil case err := <-errC: - // If err is listener closed err ignore and return nil - if strings.Contains(err.Error(), "closed") { + s.mu.Lock() + stopped := s.stopped + s.mu.Unlock() + if stopped { return nil } return err - case conn := <-s.conns: + case conn := <-conns: s.accepter.Accept(conn) } } diff --git a/udf/agent/udf.pb.go b/udf/agent/udf.pb.go index 3bfecf2af..63dc47bdd 100644 --- a/udf/agent/udf.pb.go +++ b/udf/agent/udf.pb.go @@ -136,6 +136,8 @@ func (*OptionInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{ // Request that the process initialize itself with the provided options. type InitRequest struct { Options []*Option `protobuf:"bytes,1,rep,name=options" json:"options,omitempty"` + TaskID string `protobuf:"bytes,2,opt,name=taskID" json:"taskID,omitempty"` + NodeID string `protobuf:"bytes,3,opt,name=nodeID" json:"nodeID,omitempty"` } func (m *InitRequest) Reset() { *m = InitRequest{} } @@ -1228,74 +1230,75 @@ func init() { func init() { proto.RegisterFile("udf.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1099 bytes of a gzipped FileDescriptorProto + // 1118 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x57, 0xdd, 0x72, 0xdb, 0x44, 0x14, 0xae, 0x22, 0xcb, 0x96, 0x8e, 0x9d, 0x44, 0xd9, 0x86, 0x54, 0x04, 0x26, 0x13, 0x04, 0x6d, - 0x93, 0x50, 0x0c, 0x98, 0xbf, 0xd2, 0x61, 0x60, 0x62, 0x62, 0x88, 0x87, 0x36, 0xee, 0x28, 0x6e, - 0xef, 0xe5, 0x68, 0xe3, 0x6a, 0xea, 0x48, 0x46, 0x5a, 0x07, 0xcc, 0x15, 0xef, 0xc3, 0x2d, 0x0f, - 0xc1, 0x05, 0x4f, 0xc2, 0x0c, 0xef, 0xc0, 0xfe, 0x69, 0xb5, 0xb2, 0x0d, 0x9d, 0x32, 0x9d, 0xe9, - 0x9d, 0x77, 0xcf, 0x77, 0xce, 0xd9, 0x73, 0xbe, 0xf3, 0x23, 0x83, 0x33, 0x8b, 0x2e, 0xdb, 0xd3, - 0x2c, 0x25, 0x29, 0xb2, 0xc2, 0x31, 0x4e, 0x88, 0xbf, 0x0e, 0xcd, 0x7e, 0x72, 0x99, 0x06, 0xf8, - 0xc7, 0x19, 0xce, 0x89, 0xff, 0xb7, 0x01, 0x2d, 0x71, 0xce, 0xa7, 0x69, 0x92, 0x63, 0x74, 0x1b, - 0xac, 0x9f, 0xc2, 0x84, 0xe4, 0x9e, 0xb1, 0x6f, 0x1c, 0x6c, 0x74, 0x36, 0xdb, 0x5c, 0xad, 0xdd, - 0x8b, 0xc6, 0x78, 0x38, 0x9f, 0xe2, 0x40, 0x48, 0xd1, 0xfb, 0x60, 0x53, 0xb3, 0xd7, 0x71, 0x84, - 0x73, 0x6f, 0x6d, 0x35, 0x52, 0x01, 0xd0, 0x03, 0x68, 0xa4, 0x53, 0x12, 0x53, 0xfb, 0x9e, 0xb9, - 0x6f, 0x1e, 0x34, 0x3b, 0xfb, 0x12, 0xab, 0x7b, 0x6e, 0x0f, 0x04, 0xa4, 0x97, 0x90, 0x6c, 0x1e, - 0x14, 0x0a, 0xbb, 0x8f, 0xa0, 0xa5, 0x0b, 0x90, 0x0b, 0xe6, 0x73, 0x3c, 0xe7, 0xaf, 0x73, 0x02, - 0xf6, 0x13, 0xdd, 0x05, 0xeb, 0x3a, 0x9c, 0xcc, 0x30, 0x7f, 0x47, 0xb3, 0xb3, 0x25, 0x6d, 0x0b, - 0x2d, 0xee, 0x41, 0xc8, 0x1f, 0xac, 0xdd, 0x37, 0xfc, 0xaf, 0x01, 0x4a, 0x01, 0xfa, 0x08, 0x80, - 0x8b, 0xd8, 0x7b, 0x59, 0xc4, 0x26, 0x8d, 0xc3, 0x95, 0xfa, 0x4f, 0x0b, 0x41, 0xa0, 0x61, 0xfc, - 0xcf, 0x59, 0xfa, 0x62, 0x22, 0xd3, 0x47, 0x7d, 0xab, 0xc8, 0x0c, 0x1e, 0xd9, 0x7a, 0xc5, 0xbb, - 0x0a, 0xc3, 0x3f, 0x85, 0xba, 0xb8, 0x42, 0x08, 0x6a, 0x49, 0x78, 0x85, 0x65, 0x04, 0xfc, 0x37, - 0x3a, 0x82, 0x3a, 0xf7, 0xc1, 0x72, 0xc9, 0xac, 0xa0, 0x8a, 0x15, 0xfe, 0x92, 0x40, 0x22, 0xfc, - 0xbf, 0x0c, 0x68, 0x6a, 0xf7, 0xe8, 0x3d, 0xa8, 0x11, 0xfa, 0x34, 0xc9, 0xd7, 0xf2, 0xeb, 0xb9, - 0x14, 0xed, 0x81, 0x33, 0x4a, 0xd3, 0xc9, 0x53, 0x95, 0x28, 0xfb, 0xf4, 0x46, 0x50, 0x5e, 0xa1, - 0xb7, 0xc1, 0x8e, 0x13, 0x22, 0xc4, 0x26, 0x15, 0x9b, 0x54, 0xac, 0x6e, 0x90, 0x0f, 0xcd, 0x28, - 0x9d, 0x8d, 0x26, 0x58, 0x00, 0x6a, 0x14, 0x60, 0x50, 0x80, 0x7e, 0xc9, 0x30, 0x39, 0xc9, 0xe2, - 0x64, 0x2c, 0x30, 0x16, 0x0b, 0x8f, 0x61, 0xb4, 0x4b, 0x74, 0x07, 0xd6, 0xa3, 0x59, 0x16, 0xaa, - 0xc7, 0x7b, 0x75, 0xe9, 0xaa, 0x7a, 0xdd, 0x6d, 0x48, 0x4a, 0x29, 0x5d, 0x2d, 0x91, 0x6e, 0x59, - 0x9d, 0x1e, 0x34, 0xf2, 0xd9, 0xc5, 0x05, 0xce, 0x45, 0x7d, 0xda, 0x41, 0x71, 0x44, 0xdb, 0x60, - 0xe1, 0x2c, 0x4b, 0x33, 0x1e, 0x9c, 0x13, 0x88, 0x83, 0xbf, 0x05, 0x9b, 0xe7, 0x49, 0x38, 0xcd, - 0x9f, 0xa5, 0x05, 0x65, 0x7e, 0x1b, 0xdc, 0xf2, 0x4a, 0x9a, 0xdd, 0x05, 0x3b, 0x97, 0x77, 0xdc, - 0x6e, 0x2b, 0x50, 0x67, 0xff, 0x1e, 0x6c, 0x50, 0x1c, 0x49, 0x33, 0x5c, 0x90, 0xfe, 0x5f, 0xe8, - 0x63, 0xd8, 0x54, 0xe8, 0xff, 0xf9, 0xe6, 0x3b, 0xe0, 0xfe, 0x80, 0xf1, 0x34, 0x9c, 0xc4, 0xd7, - 0xca, 0x25, 0x2d, 0x1a, 0x12, 0xcb, 0xa2, 0x31, 0x03, 0xfe, 0xdb, 0xbf, 0x0b, 0x5b, 0x1a, 0x4e, - 0x3a, 0x5b, 0x05, 0xbc, 0x0d, 0xeb, 0x3d, 0x66, 0x59, 0x81, 0x94, 0x5f, 0x43, 0xf7, 0xfb, 0xa7, - 0x01, 0xd0, 0xc5, 0xe3, 0x38, 0xe9, 0x86, 0xe4, 0xe2, 0xd9, 0xca, 0x3a, 0xa5, 0x8a, 0xe3, 0x2c, - 0x9d, 0x4d, 0x8b, 0x07, 0xf3, 0x03, 0xfa, 0x90, 0xfa, 0x0c, 0xc7, 0x45, 0x6f, 0xbf, 0x25, 0x2b, - 0xb0, 0x34, 0xd5, 0x1e, 0x52, 0xa9, 0x68, 0x6b, 0x0e, 0x64, 0xa6, 0xf3, 0xf8, 0x17, 0x51, 0x47, - 0xf4, 0x91, 0xec, 0x37, 0xda, 0x81, 0xfa, 0x68, 0x7e, 0xc6, 0x1c, 0x5a, 0x3c, 0x49, 0xf2, 0xb4, - 0xfb, 0x05, 0x38, 0x4a, 0x7d, 0x45, 0xf3, 0x6f, 0xeb, 0xcd, 0xef, 0xe8, 0x9d, 0xfe, 0x9b, 0x05, - 0xd6, 0xe3, 0x94, 0x96, 0xf0, 0xaa, 0x9c, 0xa8, 0xe8, 0xd6, 0xb4, 0xe8, 0x28, 0xaf, 0x51, 0x48, - 0xc2, 0x51, 0x98, 0x8b, 0x1e, 0x70, 0x02, 0x75, 0x46, 0x07, 0xb0, 0x99, 0x61, 0x42, 0xe3, 0xa2, - 0x35, 0xfa, 0x38, 0x9d, 0xc4, 0x17, 0x73, 0xfe, 0x7a, 0x27, 0x58, 0xbc, 0x2e, 0x73, 0x64, 0xe9, - 0x39, 0xda, 0x03, 0x88, 0xa8, 0xdf, 0x24, 0xe7, 0xb3, 0xa2, 0x4e, 0x33, 0xe5, 0x04, 0xda, 0x0d, - 0x9d, 0x00, 0x22, 0x87, 0x0d, 0x9e, 0xc3, 0x1d, 0x99, 0x43, 0xfe, 0xfe, 0xa5, 0xf4, 0x75, 0xa1, - 0x75, 0x19, 0xe3, 0x49, 0x94, 0x9f, 0xf0, 0xf6, 0xf3, 0x6c, 0xae, 0xb3, 0x57, 0xd1, 0xf9, 0x4e, - 0x03, 0x08, 0xdd, 0x8a, 0x0e, 0xfa, 0x12, 0x1c, 0x71, 0xee, 0x27, 0xc4, 0x73, 0x2a, 0xc4, 0xe9, - 0x06, 0xa8, 0x54, 0x68, 0x97, 0xe8, 0xd2, 0xfd, 0x39, 0xef, 0x6c, 0x0f, 0xfe, 0xd5, 0xbd, 0x00, - 0x54, 0xdc, 0x8b, 0x2b, 0x8d, 0xed, 0xe6, 0x2b, 0x61, 0x7b, 0xf7, 0x1b, 0xd8, 0x5a, 0x0a, 0xf9, - 0x45, 0x06, 0x0c, 0xdd, 0xc0, 0x57, 0xb0, 0x51, 0x0d, 0xf9, 0x45, 0xda, 0xe6, 0x4a, 0xf7, 0x5a, - 0xc8, 0x2f, 0x55, 0xad, 0x7f, 0x18, 0x60, 0xf7, 0x92, 0xe8, 0x65, 0x5b, 0x8f, 0x95, 0xf6, 0x55, - 0xf8, 0xb3, 0x18, 0xd9, 0x01, 0xff, 0x8d, 0x3e, 0x90, 0xa5, 0x54, 0xe3, 0xbc, 0xbc, 0x59, 0xac, - 0x65, 0x69, 0x7c, 0xa9, 0x9a, 0x5e, 0x79, 0xe3, 0xfd, 0x6a, 0x42, 0xa3, 0x98, 0x5b, 0x07, 0x50, - 0x8b, 0xe9, 0xa2, 0xe5, 0x8a, 0xe5, 0x5a, 0xd3, 0x3e, 0x40, 0xe8, 0xec, 0xe7, 0x08, 0x81, 0x8c, - 0x89, 0x5c, 0xe2, 0x25, 0x52, 0xed, 0x5a, 0x81, 0x8c, 0x09, 0xa2, 0x0f, 0x7b, 0x5e, 0xcc, 0x3d, - 0x1e, 0x78, 0xb3, 0x73, 0x4b, 0xc2, 0x17, 0xe7, 0x26, 0xdb, 0x71, 0x0a, 0x8b, 0x3e, 0xd5, 0xe6, - 0x76, 0x8d, 0xeb, 0x15, 0x7d, 0xb6, 0xb0, 0x23, 0xd8, 0xee, 0x2b, 0x90, 0xe8, 0x63, 0x68, 0x64, - 0x62, 0xa2, 0xf3, 0x04, 0x35, 0x3b, 0x6f, 0x48, 0xa5, 0xea, 0x56, 0xa0, 0x3a, 0x05, 0x0e, 0x1d, - 0x82, 0x35, 0x62, 0xd3, 0xcf, 0x73, 0x2b, 0x5f, 0x24, 0xe5, 0x44, 0xa4, 0x60, 0x81, 0xa0, 0xdb, - 0xdb, 0x9a, 0xb2, 0x8e, 0xf1, 0xb6, 0x38, 0xb4, 0xa5, 0x77, 0x11, 0x43, 0x71, 0x21, 0x7a, 0x17, - 0x4c, 0x9c, 0x44, 0x1e, 0xe2, 0x98, 0xcd, 0x05, 0x46, 0x29, 0x8c, 0x49, 0xbb, 0x0e, 0x34, 0xae, - 0xe8, 0x56, 0xa1, 0x42, 0xff, 0x77, 0x13, 0x6c, 0x35, 0xed, 0x0f, 0x2b, 0x1c, 0xdc, 0x5c, 0xf1, - 0xe9, 0xa5, 0x48, 0x38, 0xac, 0x90, 0x70, 0xb3, 0x42, 0x82, 0x0e, 0xa5, 0x2c, 0xdc, 0x5f, 0x66, - 0xc1, 0x5b, 0x66, 0x41, 0x29, 0x69, 0x34, 0x7c, 0xb6, 0x44, 0xc3, 0xad, 0x25, 0x1a, 0x94, 0x5e, - 0xc9, 0x43, 0x67, 0x91, 0x87, 0x9d, 0x45, 0x1e, 0x94, 0x92, 0x22, 0xe2, 0x5e, 0xb1, 0xe8, 0xea, - 0x5c, 0x63, 0xbb, 0xc8, 0x9c, 0xbe, 0x0d, 0x59, 0x96, 0x39, 0xe8, 0xb5, 0xd3, 0x76, 0xf4, 0x0e, - 0x9d, 0x01, 0xf2, 0xeb, 0x19, 0x01, 0xd4, 0xcf, 0x87, 0x41, 0xef, 0xf8, 0x91, 0x7b, 0x03, 0x39, - 0x60, 0x75, 0x8f, 0x87, 0xdf, 0x9e, 0xba, 0xc6, 0xd1, 0x09, 0x38, 0xea, 0xd3, 0x0e, 0xd9, 0x50, - 0xeb, 0x0e, 0x06, 0x0f, 0x29, 0xa2, 0x01, 0x66, 0xff, 0x6c, 0xe8, 0x1a, 0x4c, 0xed, 0x64, 0xf0, - 0xa4, 0xfb, 0xb0, 0xe7, 0xae, 0x49, 0x13, 0xfd, 0xb3, 0xef, 0x5d, 0x13, 0xb5, 0xc0, 0x3e, 0x79, - 0x12, 0x1c, 0x0f, 0xfb, 0x83, 0x33, 0xb7, 0x36, 0xaa, 0xf3, 0xbf, 0x04, 0x9f, 0xfc, 0x13, 0x00, - 0x00, 0xff, 0xff, 0x8b, 0x8a, 0xa6, 0x74, 0x1f, 0x0c, 0x00, 0x00, + 0x93, 0x50, 0x0c, 0x18, 0x18, 0x4a, 0x87, 0x81, 0x89, 0x89, 0x21, 0x1e, 0xda, 0xb8, 0xa3, 0xb8, + 0xbd, 0x97, 0xa3, 0x8d, 0xab, 0x89, 0x23, 0x19, 0x69, 0x1d, 0x30, 0x57, 0xbc, 0x0f, 0xb7, 0x3c, + 0x04, 0x17, 0x3c, 0x09, 0x33, 0xbc, 0x03, 0xfb, 0xa7, 0xd5, 0xca, 0x36, 0x74, 0xca, 0x74, 0x86, + 0x3b, 0xef, 0x39, 0xdf, 0xf9, 0xfd, 0xce, 0x9e, 0x95, 0xc1, 0x99, 0x45, 0x97, 0xed, 0x69, 0x96, + 0x92, 0x14, 0x59, 0xe1, 0x18, 0x27, 0xc4, 0x5f, 0x87, 0x66, 0x3f, 0xb9, 0x4c, 0x03, 0xfc, 0xc3, + 0x0c, 0xe7, 0xc4, 0xff, 0xcb, 0x80, 0x96, 0x38, 0xe7, 0xd3, 0x34, 0xc9, 0x31, 0xba, 0x0b, 0xd6, + 0x8f, 0x61, 0x42, 0x72, 0xcf, 0xd8, 0x37, 0x0e, 0x36, 0x3a, 0x9b, 0x6d, 0x6e, 0xd6, 0xee, 0x45, + 0x63, 0x3c, 0x9c, 0x4f, 0x71, 0x20, 0xb4, 0xe8, 0x7d, 0xb0, 0xa9, 0xdb, 0x9b, 0x38, 0xc2, 0xb9, + 0xb7, 0xb6, 0x1a, 0xa9, 0x00, 0xe8, 0x11, 0x34, 0xd2, 0x29, 0x89, 0xa9, 0x7f, 0xcf, 0xdc, 0x37, + 0x0f, 0x9a, 0x9d, 0x7d, 0x89, 0xd5, 0x23, 0xb7, 0x07, 0x02, 0xd2, 0x4b, 0x48, 0x36, 0x0f, 0x0a, + 0x83, 0xdd, 0x27, 0xd0, 0xd2, 0x15, 0xc8, 0x05, 0xf3, 0x0a, 0xcf, 0x79, 0x76, 0x4e, 0xc0, 0x7e, + 0xa2, 0xfb, 0x60, 0xdd, 0x84, 0x93, 0x19, 0xe6, 0x79, 0x34, 0x3b, 0x5b, 0xd2, 0xb7, 0xb0, 0xe2, + 0x11, 0x84, 0xfe, 0xd1, 0xda, 0x43, 0xc3, 0xff, 0x0a, 0xa0, 0x54, 0xa0, 0x8f, 0x00, 0xb8, 0x8a, + 0xe5, 0xcb, 0x2a, 0x36, 0x69, 0x1d, 0xae, 0xb4, 0x7f, 0x5e, 0x28, 0x02, 0x0d, 0xe3, 0x5f, 0xb2, + 0xf6, 0xc5, 0x44, 0xb6, 0x8f, 0xc6, 0x56, 0x95, 0x19, 0xbc, 0xb2, 0xf5, 0x4a, 0x74, 0x55, 0x06, + 0xda, 0x81, 0x3a, 0x09, 0xf3, 0xab, 0xfe, 0x09, 0xcf, 0xd2, 0x09, 0xe4, 0x89, 0xc9, 0x93, 0x34, + 0xc2, 0x54, 0x6e, 0x0a, 0xb9, 0x38, 0xf9, 0xa7, 0x50, 0x17, 0x2e, 0x10, 0x82, 0x5a, 0x12, 0x5e, + 0x63, 0x59, 0x31, 0xff, 0x8d, 0x8e, 0xa0, 0xce, 0x73, 0x62, 0xbd, 0x67, 0x51, 0x51, 0x25, 0x2a, + 0xcf, 0x3c, 0x90, 0x08, 0xff, 0x4f, 0x03, 0x9a, 0x9a, 0x1c, 0xbd, 0x07, 0x35, 0x42, 0x4b, 0x91, + 0xfc, 0x2e, 0x57, 0xcb, 0xb5, 0x68, 0x0f, 0x9c, 0x51, 0x9a, 0x4e, 0x9e, 0xab, 0xc6, 0xda, 0xa7, + 0xb7, 0x82, 0x52, 0x84, 0xde, 0x06, 0x3b, 0x4e, 0x88, 0x50, 0xb3, 0xcc, 0x4d, 0xaa, 0x56, 0x12, + 0xe4, 0x43, 0x33, 0x4a, 0x67, 0xa3, 0x09, 0x16, 0x80, 0x1a, 0x05, 0x18, 0x14, 0xa0, 0x0b, 0x19, + 0x26, 0x27, 0x59, 0x9c, 0x8c, 0x05, 0xc6, 0x62, 0xe5, 0x31, 0x8c, 0x26, 0x44, 0xf7, 0x60, 0x3d, + 0x9a, 0x65, 0xa1, 0x4a, 0xde, 0xab, 0xcb, 0x50, 0x55, 0x71, 0xb7, 0x21, 0x47, 0x80, 0xd2, 0xdb, + 0x12, 0xf4, 0xc8, 0x69, 0xf6, 0xa0, 0x91, 0xcf, 0x2e, 0x2e, 0x70, 0x2e, 0xe6, 0xd9, 0x0e, 0x8a, + 0x23, 0xda, 0x06, 0x0b, 0x67, 0x59, 0x9a, 0x49, 0x3e, 0xc4, 0xc1, 0xdf, 0x82, 0xcd, 0xf3, 0x24, + 0x9c, 0xe6, 0x2f, 0xd2, 0x82, 0x62, 0xbf, 0x0d, 0x6e, 0x29, 0x92, 0x6e, 0x77, 0xc1, 0xce, 0xa5, + 0x8c, 0xfb, 0x6d, 0x05, 0xea, 0xec, 0x3f, 0x80, 0x0d, 0x8a, 0x23, 0x69, 0x86, 0x8b, 0x21, 0xf9, + 0x37, 0xf4, 0x31, 0x6c, 0x2a, 0xf4, 0x7f, 0xcc, 0xf9, 0x1e, 0xb8, 0xdf, 0x63, 0x3c, 0x0d, 0x27, + 0xf1, 0x8d, 0x0a, 0x49, 0x87, 0x86, 0xc4, 0x72, 0x68, 0xcc, 0x80, 0xff, 0xf6, 0xef, 0xc3, 0x96, + 0x86, 0x93, 0xc1, 0x56, 0x01, 0xef, 0xc2, 0x7a, 0x8f, 0x79, 0x56, 0x20, 0x15, 0xd7, 0xd0, 0xe3, + 0xfe, 0x61, 0x00, 0x74, 0xf1, 0x38, 0x4e, 0xba, 0x21, 0xb9, 0x78, 0xb1, 0x72, 0x4e, 0xa9, 0xe1, + 0x38, 0x4b, 0x67, 0xd3, 0x22, 0x61, 0x7e, 0x40, 0x1f, 0xd2, 0x98, 0xe1, 0xb8, 0xd8, 0x05, 0x6f, + 0xc9, 0x09, 0x2c, 0x5d, 0xb5, 0x87, 0x54, 0x2b, 0xd6, 0x00, 0x07, 0x32, 0xd7, 0x79, 0xfc, 0xb3, + 0x98, 0x23, 0x9a, 0x24, 0xfb, 0xcd, 0x2e, 0xce, 0x68, 0x7e, 0xc6, 0x02, 0x5a, 0xbc, 0x49, 0xf2, + 0xb4, 0xfb, 0x39, 0x38, 0xca, 0x7c, 0xc5, 0xb2, 0xd8, 0xd6, 0x97, 0x85, 0xa3, 0x6f, 0x86, 0x5f, + 0x2d, 0xb0, 0x9e, 0xa6, 0x74, 0x84, 0x57, 0xf5, 0x44, 0x55, 0xb7, 0xa6, 0x55, 0x47, 0x79, 0x8d, + 0x42, 0x12, 0x8e, 0xc2, 0x1c, 0xcb, 0xdb, 0xab, 0xce, 0xe8, 0x00, 0x36, 0x33, 0x4c, 0x68, 0x5d, + 0x74, 0x46, 0x9f, 0xa6, 0x93, 0xf8, 0x62, 0xce, 0xb3, 0x77, 0x82, 0x45, 0x71, 0xd9, 0x23, 0x4b, + 0xef, 0xd1, 0x1e, 0x40, 0x44, 0xe3, 0x26, 0x39, 0xdf, 0x2d, 0x75, 0xda, 0x29, 0x27, 0xd0, 0x24, + 0x74, 0x03, 0x88, 0x1e, 0x36, 0x78, 0x0f, 0x77, 0x64, 0x0f, 0x79, 0xfe, 0x4b, 0xed, 0xeb, 0x42, + 0xeb, 0x32, 0xc6, 0x93, 0x28, 0x3f, 0xe1, 0xd7, 0xcf, 0xb3, 0xb9, 0xcd, 0x5e, 0xc5, 0xe6, 0x5b, + 0x0d, 0x20, 0x6c, 0x2b, 0x36, 0xe8, 0x0b, 0x70, 0xc4, 0xb9, 0x9f, 0x10, 0xcf, 0xa9, 0x10, 0xa7, + 0x3b, 0xa0, 0x5a, 0x61, 0x5d, 0xa2, 0xcb, 0xf0, 0xe7, 0xfc, 0x66, 0x7b, 0xf0, 0x8f, 0xe1, 0x05, + 0xa0, 0x12, 0x5e, 0x88, 0x34, 0xb6, 0x9b, 0xaf, 0x85, 0xed, 0xdd, 0xaf, 0x61, 0x6b, 0xa9, 0xe4, + 0x97, 0x39, 0x30, 0x74, 0x07, 0x5f, 0xc2, 0x46, 0xb5, 0xe4, 0x97, 0x59, 0x9b, 0x2b, 0xc3, 0x6b, + 0x25, 0xbf, 0xd2, 0xb4, 0xfe, 0x6e, 0x80, 0xdd, 0x4b, 0xa2, 0x57, 0xbd, 0x7a, 0x6c, 0xb4, 0xaf, + 0xc3, 0x9f, 0xc4, 0xca, 0x0e, 0xf8, 0x6f, 0xf4, 0x81, 0x1c, 0xa5, 0x1a, 0xe7, 0xe5, 0xcd, 0xe2, + 0x19, 0x97, 0xce, 0x97, 0xa6, 0xe9, 0xb5, 0x5f, 0xbc, 0x5f, 0x4c, 0x68, 0x14, 0x7b, 0xeb, 0x00, + 0x6a, 0x31, 0x7d, 0x98, 0xb9, 0x61, 0xf9, 0xac, 0x69, 0x1f, 0x2c, 0x74, 0xf7, 0x73, 0x84, 0x40, + 0xc6, 0x44, 0x3e, 0xfa, 0x25, 0x52, 0xbd, 0xcd, 0x02, 0x19, 0x13, 0x44, 0x13, 0xbb, 0x2a, 0xf6, + 0x1e, 0x2f, 0xbc, 0xd9, 0xb9, 0x23, 0xe1, 0x8b, 0x7b, 0x93, 0xbd, 0x71, 0x0a, 0x8b, 0x3e, 0xd5, + 0xf6, 0x76, 0x8d, 0xdb, 0x15, 0xf7, 0x6c, 0xe1, 0x8d, 0x60, 0x6f, 0x5f, 0x81, 0x44, 0x1f, 0x43, + 0x23, 0x13, 0x1b, 0x9d, 0x37, 0xa8, 0xd9, 0x79, 0x43, 0x1a, 0x55, 0x5f, 0x05, 0x6a, 0x53, 0xe0, + 0xd0, 0x21, 0x58, 0x23, 0xb6, 0xfd, 0x3c, 0xb7, 0xf2, 0x05, 0x53, 0x6e, 0x44, 0x0a, 0x16, 0x08, + 0xfa, 0x7a, 0x5b, 0x53, 0x76, 0x63, 0xbc, 0x2d, 0x0e, 0x6d, 0xe9, 0xb7, 0x88, 0xa1, 0xb8, 0x12, + 0xbd, 0x0b, 0x26, 0x4e, 0x22, 0x0f, 0x71, 0xcc, 0xe6, 0x02, 0xa3, 0x14, 0xc6, 0xb4, 0x5d, 0x07, + 0x1a, 0xd7, 0xf4, 0x55, 0xa1, 0x4a, 0xff, 0x37, 0x13, 0x6c, 0xb5, 0xed, 0x0f, 0x2b, 0x1c, 0xdc, + 0x5e, 0xf1, 0xa9, 0xa6, 0x48, 0x38, 0xac, 0x90, 0x70, 0xbb, 0x42, 0x82, 0x0e, 0xa5, 0x2c, 0x3c, + 0x5c, 0x66, 0xc1, 0x5b, 0x66, 0x41, 0x19, 0x69, 0x34, 0x7c, 0xb6, 0x44, 0xc3, 0x9d, 0x25, 0x1a, + 0x94, 0x5d, 0xc9, 0x43, 0x67, 0x91, 0x87, 0x9d, 0x45, 0x1e, 0x94, 0x91, 0x22, 0xe2, 0x41, 0xf1, + 0xd0, 0xd5, 0xb9, 0xc5, 0x76, 0xd1, 0x39, 0xfd, 0x35, 0x64, 0x5d, 0xe6, 0xa0, 0xff, 0x9d, 0xb6, + 0xa3, 0x77, 0xe8, 0x0e, 0x90, 0x5f, 0xdb, 0x08, 0xa0, 0x7e, 0x3e, 0x0c, 0x7a, 0xc7, 0x4f, 0xdc, + 0x5b, 0xc8, 0x01, 0xab, 0x7b, 0x3c, 0xfc, 0xe6, 0xd4, 0x35, 0x8e, 0x4e, 0xc0, 0x51, 0x9f, 0x76, + 0xc8, 0x86, 0x5a, 0x77, 0x30, 0x78, 0x4c, 0x11, 0x0d, 0x30, 0xfb, 0x67, 0x43, 0xd7, 0x60, 0x66, + 0x27, 0x83, 0x67, 0xdd, 0xc7, 0x3d, 0x77, 0x4d, 0xba, 0xe8, 0x9f, 0x7d, 0xe7, 0x9a, 0xa8, 0x05, + 0xf6, 0xc9, 0xb3, 0xe0, 0x78, 0xd8, 0x1f, 0x9c, 0xb9, 0xb5, 0x51, 0x9d, 0xff, 0x85, 0xf8, 0xe4, + 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x9e, 0x6c, 0x9b, 0x4f, 0x0c, 0x00, 0x00, } diff --git a/udf/agent/udf.proto b/udf/agent/udf.proto index fbc231b73..353b1585a 100644 --- a/udf/agent/udf.proto +++ b/udf/agent/udf.proto @@ -68,6 +68,8 @@ message OptionInfo { // Request that the process initialize itself with the provided options. message InitRequest { repeated Option options = 1; + string taskID = 2; + string nodeID = 3; } message Option { diff --git a/udf/server.go b/udf/server.go index 0c0ea76bb..736504d20 100644 --- a/udf/server.go +++ b/udf/server.go @@ -66,6 +66,9 @@ type Server struct { keepalive chan int64 keepaliveTimeout time.Duration + taskID string + nodeID string + in agent.ByteReadReader out io.WriteCloser @@ -88,6 +91,7 @@ type Server struct { } func NewServer( + taskID, nodeID string, in agent.ByteReadReader, out io.WriteCloser, l *log.Logger, @@ -96,6 +100,8 @@ func NewServer( killCallback func(), ) *Server { s := &Server{ + taskID: taskID, + nodeID: nodeID, in: in, out: out, logger: l, @@ -258,6 +264,8 @@ func (s *Server) Init(options []*agent.Option) error { req := &agent.Request{Message: &agent.Request_Init{ Init: &agent.InitRequest{ Options: options, + TaskID: s.taskID, + NodeID: s.nodeID, }, }} resp, err := s.doRequestResponse(req, s.initResponse) diff --git a/udf/server_test.go b/udf/server_test.go index 7c4ca6b84..38ddaa5b3 100644 --- a/udf/server_test.go +++ b/udf/server_test.go @@ -18,7 +18,7 @@ import ( func TestUDF_StartStop(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartStop] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) s.Start() @@ -35,7 +35,7 @@ func TestUDF_StartStop(t *testing.T) { func TestUDF_StartInitStop(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartStop] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) go func() { req := <-u.Requests _, ok := req.Message.(*agent.Request_Init) @@ -71,7 +71,7 @@ func TestUDF_StartInitStop(t *testing.T) { func TestUDF_StartInitAbort(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartInfoAbort] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) s.Start() expErr := errors.New("explicit abort") go func() { @@ -92,7 +92,7 @@ func TestUDF_StartInitAbort(t *testing.T) { func TestUDF_StartInfoStop(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartInfoStop] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) go func() { req := <-u.Requests _, ok := req.Message.(*agent.Request_Info) @@ -134,7 +134,7 @@ func TestUDF_StartInfoStop(t *testing.T) { func TestUDF_StartInfoAbort(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartInfoAbort] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) s.Start() expErr := errors.New("explicit abort") go func() { @@ -156,7 +156,7 @@ func TestUDF_Keepalive(t *testing.T) { t.Parallel() u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_Keepalive] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, time.Millisecond*100, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, time.Millisecond*100, nil, nil) s.Start() s.Init(nil) req := <-u.Requests @@ -196,7 +196,7 @@ func TestUDF_MissedKeepalive(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_MissedKeepalive] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) s.Start() // Since the keepalive is missed, the process should abort on its own. @@ -230,7 +230,7 @@ func TestUDF_KillCallBack(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_MissedKeepalive] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, timeout, aborted, kill) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, timeout, aborted, kill) s.Start() // Since the keepalive is missed, the process should abort on its own. @@ -259,7 +259,7 @@ func TestUDF_MissedKeepaliveInit(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_MissedKeepaliveInit] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) s.Start() s.Init(nil) @@ -287,7 +287,7 @@ func TestUDF_MissedKeepaliveInfo(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_MissedKeepaliveInfo] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, time.Millisecond*100, aborted, nil) s.Start() s.Info() @@ -309,7 +309,7 @@ func TestUDF_MissedKeepaliveInfo(t *testing.T) { func TestUDF_SnapshotRestore(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_SnapshotRestore] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) go func() { // Init req := <-u.Requests @@ -378,7 +378,7 @@ func TestUDF_SnapshotRestore(t *testing.T) { func TestUDF_StartInitPointStop(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartPointStop] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) go func() { req := <-u.Requests _, ok := req.Message.(*agent.Request_Init) @@ -440,7 +440,7 @@ func TestUDF_StartInitPointStop(t *testing.T) { func TestUDF_StartInitBatchStop(t *testing.T) { u := udf_test.NewIO() l := log.New(os.Stderr, "[TestUDF_StartPointStop] ", log.LstdFlags) - s := udf.NewServer(u.Out(), u.In(), l, 0, nil, nil) + s := udf.NewServer("testTask", "testNode", u.Out(), u.In(), l, 0, nil, nil) go func() { req := <-u.Requests _, ok := req.Message.(*agent.Request_Init) diff --git a/udf/test/test_udf.go b/udf/test/test_udf.go index ea1a5d364..f59460f53 100644 --- a/udf/test/test_udf.go +++ b/udf/test/test_udf.go @@ -120,20 +120,25 @@ func (o *IO) Out() agent.ByteReadReader { } type UDF struct { + taskID string + nodeID string + *udf.Server uio *IO logger *log.Logger } -func New(uio *IO, l *log.Logger) *UDF { +func New(taskID, nodeID string, uio *IO, l *log.Logger) *UDF { return &UDF{ + taskID: taskID, + nodeID: nodeID, uio: uio, logger: l, } } func (u *UDF) Open() error { - u.Server = udf.NewServer(u.uio.Out(), u.uio.In(), u.logger, 0, nil, nil) + u.Server = udf.NewServer(u.taskID, u.nodeID, u.uio.Out(), u.uio.In(), u.logger, 0, nil, nil) return u.Server.Start() } diff --git a/udf_test.go b/udf_test.go index 378badeca..4bb8c5f41 100644 --- a/udf_test.go +++ b/udf_test.go @@ -22,7 +22,7 @@ import ( func newUDFSocket(name string) (*kapacitor.UDFSocket, *udf_test.IO) { uio := udf_test.NewIO() l := log.New(os.Stderr, fmt.Sprintf("[%s] ", name), log.LstdFlags) - u := kapacitor.NewUDFSocket(newTestSocket(uio), l, 0, nil) + u := kapacitor.NewUDFSocket(name, "testNode", newTestSocket(uio), l, 0, nil) return u, uio } @@ -30,7 +30,7 @@ func newUDFProcess(name string) (*kapacitor.UDFProcess, *udf_test.IO) { uio := udf_test.NewIO() cmd := newTestCommander(uio) l := log.New(os.Stderr, fmt.Sprintf("[%s] ", name), log.LstdFlags) - u := kapacitor.NewUDFProcess(cmd, command.Spec{}, l, 0, nil) + u := kapacitor.NewUDFProcess(name, "testNode", cmd, command.Spec{}, l, 0, nil) return u, uio }