Skip to content

Commit

Permalink
expose taskID and nodeID to UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jul 31, 2017
1 parent b5011b4 commit 1c61319
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 142 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

- [#1400](https://github.com/influxdata/kapacitor/issues/1400): Allow for `.yml` file extensions in `define-topic-handler`
- [#1402](https://github.com/influxdata/kapacitor/pull/1402): Fix http server error logging.
- [#1500](https://github.com/influxdata/kapacitor/pull/1500): Fix bugs with stopping running UDF agent.

## v1.3.1 [2017-06-02]

Expand Down
6 changes: 3 additions & 3 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func compareAlertData(exp, got alert.Data) (bool, string) {
type UDFService struct {
ListFunc func() []string
InfoFunc func(name string) (udf.Info, bool)
CreateFunc func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error)
CreateFunc func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error)
}

func (u UDFService) List() []string {
Expand All @@ -148,8 +148,8 @@ func (u UDFService) Info(name string) (udf.Info, bool) {
return u.InfoFunc(name)
}

func (u UDFService) Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
return u.CreateFunc(name, l, abortCallback)
func (u UDFService) Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
return u.CreateFunc(name, taskID, nodeID, l, abortCallback)
}

type taskStore struct{}
Expand Down
10 changes: 8 additions & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5930,11 +5930,11 @@ stream
return
}
uio := udf_test.NewIO()
udfService.CreateFunc = func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
udfService.CreateFunc = func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
if name != "customFunc" {
return nil, fmt.Errorf("unknown function %s", name)
}
return udf_test.New(uio, l), nil
return udf_test.New(taskID, nodeID, uio, l), nil
}

tmInit := func(tm *kapacitor.TaskMaster) {
Expand All @@ -5950,6 +5950,12 @@ stream
t.Error("expected init message")
}
init := i.Init
if got, exp := init.TaskID, "TestStream_CustomFunctions"; got != exp {
t.Errorf("unexpected task ID got %q exp %q", got, exp)
}
if got, exp := init.NodeID, "customFunc4"; got != exp {
t.Errorf("unexpected task ID got %q exp %q", got, exp)
}

if got, exp := len(init.Options), 2; got != exp {
t.Fatalf("unexpected number of options in init request, got %d exp %d", got, exp)
Expand Down
9 changes: 7 additions & 2 deletions services/udf/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) Info(name string) (udf.Info, bool) {
}

func (s *Service) Create(
name string,
name, taskID, nodeID string,
l *log.Logger,
abortCallback func(),
) (udf.Interface, error) {
Expand All @@ -70,6 +70,7 @@ func (s *Service) Create(
if conf.Socket != "" {
// Create socket UDF
return kapacitor.NewUDFSocket(
taskID, nodeID,
kapacitor.NewSocketConn(conf.Socket),
l,
time.Duration(conf.Timeout),
Expand All @@ -87,6 +88,7 @@ func (s *Service) Create(
Env: env,
}
return kapacitor.NewUDFProcess(
taskID, nodeID,
command.ExecCommander,
cmdSpec,
l,
Expand All @@ -109,7 +111,10 @@ func (s *Service) Refresh(name string) error {
}

func (s *Service) loadUDFInfo(name string) (udf.Info, error) {
u, err := s.Create(name, s.logger, nil)
// loadUDFInfo creates a UDF connection outside the context of a task or node
// because it only makes the Info request and never makes an Init request.
// As such it does not need to provide actual task and node IDs.
u, err := s.Create(name, "", "", s.logger, nil)
if err != nil {
return udf.Info{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type LogService interface {
type UDFService interface {
List() []string
Info(name string) (udf.Info, bool)
Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error)
Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error)
}

var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
Expand Down
18 changes: 18 additions & 0 deletions udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func newUDFNode(et *ExecutingTask, n *pipeline.UDFNode, l *log.Logger) (*UDFNode
// Create the UDF
f, err := et.tm.UDFService.Create(
n.UDFName,
et.Task.ID,
n.Name(),
l,
un.abortedCallback,
)
Expand Down Expand Up @@ -143,6 +145,9 @@ func (n *UDFNode) snapshot() ([]byte, error) {
// over STDIN and STDOUT. Lines received over STDERR are logged
// via normal Kapacitor logging.
type UDFProcess struct {
taskName string
nodeName string

server *udf.Server
commander command.Commander
cmdSpec command.Spec
Expand All @@ -162,13 +167,16 @@ type UDFProcess struct {
}

func NewUDFProcess(
taskName, nodeName string,
commander command.Commander,
cmdSpec command.Spec,
l *log.Logger,
timeout time.Duration,
abortCallback func(),
) *UDFProcess {
return &UDFProcess{
taskName: taskName,
nodeName: nodeName,
commander: commander,
cmdSpec: cmdSpec,
logger: l,
Expand Down Expand Up @@ -208,6 +216,8 @@ func (p *UDFProcess) Open() error {
outBuf := bufio.NewReader(stdout)

p.server = udf.NewServer(
p.taskName,
p.nodeName,
outBuf,
stdin,
p.logger,
Expand Down Expand Up @@ -269,6 +279,9 @@ func (p *UDFProcess) Out() <-chan edge.Message { return p.server.Out()
func (p *UDFProcess) Info() (udf.Info, error) { return p.server.Info() }

type UDFSocket struct {
taskName string
nodeName string

server *udf.Server
socket Socket

Expand All @@ -285,12 +298,15 @@ type Socket interface {
}

func NewUDFSocket(
taskName, nodeName string,
socket Socket,
l *log.Logger,
timeout time.Duration,
abortCallback func(),
) *UDFSocket {
return &UDFSocket{
taskName: taskName,
nodeName: nodeName,
socket: socket,
logger: l,
timeout: timeout,
Expand All @@ -308,6 +324,8 @@ func (s *UDFSocket) Open() error {
outBuf := bufio.NewReader(out)

s.server = udf.NewServer(
s.taskName,
s.nodeName,
outBuf,
in,
s.logger,
Expand Down
Loading

0 comments on commit 1c61319

Please sign in to comment.