Skip to content

Commit

Permalink
reorganize udf agent code so that agent has no deps within kapacitor
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 1, 2017
1 parent f374653 commit 331c6b6
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 621 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bugfixes

- [#1294](https://github.com/influxdata/kapacitor/issues/1294): Fix bug where batch queries would be missing all fields after the first nil field.
- [#1343](https://github.com/influxdata/kapacitor/issues/1343): BREAKING: The UDF agent Go API has changed, the changes now make it so that the agent package is self contained.

## v1.3.0-beta1 [2017-04-29]

Expand Down
67 changes: 34 additions & 33 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/victorops/victoropstest"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
"github.com/influxdata/kapacitor/udf/test"
"github.com/influxdata/wlog"
"github.com/k-sone/snmpgo"
Expand Down Expand Up @@ -5459,19 +5460,19 @@ stream
if name != "customFunc" {
return
}
info.Wants = udf.EdgeType_STREAM
info.Provides = udf.EdgeType_STREAM
info.Options = map[string]*udf.OptionInfo{
info.Wants = agent.EdgeType_STREAM
info.Provides = agent.EdgeType_STREAM
info.Options = map[string]*agent.OptionInfo{
"opt1": {
ValueTypes: []udf.ValueType{udf.ValueType_STRING},
ValueTypes: []agent.ValueType{agent.ValueType_STRING},
},
"opt2": {
ValueTypes: []udf.ValueType{
udf.ValueType_BOOL,
udf.ValueType_INT,
udf.ValueType_DOUBLE,
udf.ValueType_STRING,
udf.ValueType_DURATION,
ValueTypes: []agent.ValueType{
agent.ValueType_BOOL,
agent.ValueType_INT,
agent.ValueType_DOUBLE,
agent.ValueType_STRING,
agent.ValueType_DURATION,
},
},
}
Expand All @@ -5493,7 +5494,7 @@ stream
go func() {
defer close(done)
req := <-uio.Requests
i, ok := req.Message.(*udf.Request_Init)
i, ok := req.Message.(*agent.Request_Init)
if !ok {
t.Error("expected init message")
}
Expand All @@ -5503,38 +5504,38 @@ stream
t.Fatalf("unexpected number of options in init request, got %d exp %d", got, exp)
}
for i, opt := range init.Options {
exp := &udf.Option{}
exp := &agent.Option{}
switch i {
case 0:
exp.Name = "opt1"
exp.Values = []*udf.OptionValue{
exp.Values = []*agent.OptionValue{
{
Type: udf.ValueType_STRING,
Value: &udf.OptionValue_StringValue{"count"},
Type: agent.ValueType_STRING,
Value: &agent.OptionValue_StringValue{"count"},
},
}
case 1:
exp.Name = "opt2"
exp.Values = []*udf.OptionValue{
exp.Values = []*agent.OptionValue{
{
Type: udf.ValueType_BOOL,
Value: &udf.OptionValue_BoolValue{false},
Type: agent.ValueType_BOOL,
Value: &agent.OptionValue_BoolValue{false},
},
{
Type: udf.ValueType_INT,
Value: &udf.OptionValue_IntValue{1},
Type: agent.ValueType_INT,
Value: &agent.OptionValue_IntValue{1},
},
{
Type: udf.ValueType_DOUBLE,
Value: &udf.OptionValue_DoubleValue{1.0},
Type: agent.ValueType_DOUBLE,
Value: &agent.OptionValue_DoubleValue{1.0},
},
{
Type: udf.ValueType_STRING,
Value: &udf.OptionValue_StringValue{"1.0"},
Type: agent.ValueType_STRING,
Value: &agent.OptionValue_StringValue{"1.0"},
},
{
Type: udf.ValueType_DURATION,
Value: &udf.OptionValue_DurationValue{int64(time.Second)},
Type: agent.ValueType_DURATION,
Value: &agent.OptionValue_DurationValue{int64(time.Second)},
},
}
}
Expand All @@ -5543,9 +5544,9 @@ stream
}
}

resp := &udf.Response{
Message: &udf.Response_Init{
Init: &udf.InitResponse{
resp := &agent.Response{
Message: &agent.Response_Init{
Init: &agent.InitResponse{
Success: true,
},
},
Expand All @@ -5554,12 +5555,12 @@ stream

// read all requests and wait till the chan is closed
for req := range uio.Requests {
p, ok := req.Message.(*udf.Request_Point)
p, ok := req.Message.(*agent.Request_Point)
if ok {
pt := p.Point
resp := &udf.Response{
Message: &udf.Response_Point{
Point: &udf.Point{
resp := &agent.Response{
Message: &agent.Response_Point{
Point: &agent.Point{
Name: pt.Name,
Time: pt.Time,
Group: pt.Group,
Expand Down
44 changes: 22 additions & 22 deletions pipeline/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/influxdata/kapacitor/tick"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
)

// A UDFNode is a node that can run a User Defined Function (UDF) in a separate process.
Expand Down Expand Up @@ -48,11 +48,11 @@ type UDFNode struct {
chainnode

UDFName string
options map[string]*udf.OptionInfo
options map[string]*agent.OptionInfo

// Options that were set on the node
// tick:ignore
Options []*udf.Option
Options []*agent.Option

describer *tick.ReflectionDescriber
}
Expand All @@ -61,20 +61,20 @@ func NewUDF(
parent Node,
name string,
wants,
provides udf.EdgeType,
options map[string]*udf.OptionInfo,
provides agent.EdgeType,
options map[string]*agent.OptionInfo,
) *UDFNode {
var pwants, pprovides EdgeType
switch wants {
case udf.EdgeType_STREAM:
case agent.EdgeType_STREAM:
pwants = StreamEdge
case udf.EdgeType_BATCH:
case agent.EdgeType_BATCH:
pwants = BatchEdge
}
switch provides {
case udf.EdgeType_STREAM:
case agent.EdgeType_STREAM:
pprovides = StreamEdge
case udf.EdgeType_BATCH:
case agent.EdgeType_BATCH:
pprovides = BatchEdge
}
udf := &UDFNode{
Expand Down Expand Up @@ -123,31 +123,31 @@ func (u *UDFNode) SetProperty(name string, args ...interface{}) (interface{}, er
if got, exp := len(args), len(opt.ValueTypes); got != exp {
return nil, fmt.Errorf("unexpected number of args to %s, got %d expected %d", name, got, exp)
}
values := make([]*udf.OptionValue, len(args))
values := make([]*agent.OptionValue, len(args))
for i, arg := range args {
values[i] = &udf.OptionValue{}
values[i] = &agent.OptionValue{}
switch v := arg.(type) {
case bool:
values[i].Type = udf.ValueType_BOOL
values[i].Value = &udf.OptionValue_BoolValue{v}
values[i].Type = agent.ValueType_BOOL
values[i].Value = &agent.OptionValue_BoolValue{v}
case int64:
values[i].Type = udf.ValueType_INT
values[i].Value = &udf.OptionValue_IntValue{v}
values[i].Type = agent.ValueType_INT
values[i].Value = &agent.OptionValue_IntValue{v}
case float64:
values[i].Type = udf.ValueType_DOUBLE
values[i].Value = &udf.OptionValue_DoubleValue{v}
values[i].Type = agent.ValueType_DOUBLE
values[i].Value = &agent.OptionValue_DoubleValue{v}
case string:
values[i].Type = udf.ValueType_STRING
values[i].Value = &udf.OptionValue_StringValue{v}
values[i].Type = agent.ValueType_STRING
values[i].Value = &agent.OptionValue_StringValue{v}
case time.Duration:
values[i].Type = udf.ValueType_DURATION
values[i].Value = &udf.OptionValue_DurationValue{int64(v)}
values[i].Type = agent.ValueType_DURATION
values[i].Value = &agent.OptionValue_DurationValue{int64(v)}
}
if values[i].Type != opt.ValueTypes[i] {
return nil, fmt.Errorf("unexpected arg to %s, got %v expected %v", name, values[i].Type, opt.ValueTypes[i])
}
}
u.Options = append(u.Options, &udf.Option{
u.Options = append(u.Options, &agent.Option{
Name: name,
Values: values,
})
Expand Down
37 changes: 19 additions & 18 deletions udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -291,15 +292,15 @@ func (p *UDFProcess) logStdErr() {
}
}

func (p *UDFProcess) Abort(err error) { p.server.Abort(err) }
func (p *UDFProcess) Init(options []*udf.Option) error { return p.server.Init(options) }
func (p *UDFProcess) Snapshot() ([]byte, error) { return p.server.Snapshot() }
func (p *UDFProcess) Restore(snapshot []byte) error { return p.server.Restore(snapshot) }
func (p *UDFProcess) PointIn() chan<- models.Point { return p.server.PointIn() }
func (p *UDFProcess) BatchIn() chan<- models.Batch { return p.server.BatchIn() }
func (p *UDFProcess) PointOut() <-chan models.Point { return p.server.PointOut() }
func (p *UDFProcess) BatchOut() <-chan models.Batch { return p.server.BatchOut() }
func (p *UDFProcess) Info() (udf.Info, error) { return p.server.Info() }
func (p *UDFProcess) Abort(err error) { p.server.Abort(err) }
func (p *UDFProcess) Init(options []*agent.Option) error { return p.server.Init(options) }
func (p *UDFProcess) Snapshot() ([]byte, error) { return p.server.Snapshot() }
func (p *UDFProcess) Restore(snapshot []byte) error { return p.server.Restore(snapshot) }
func (p *UDFProcess) PointIn() chan<- models.Point { return p.server.PointIn() }
func (p *UDFProcess) BatchIn() chan<- models.Batch { return p.server.BatchIn() }
func (p *UDFProcess) PointOut() <-chan models.Point { return p.server.PointOut() }
func (p *UDFProcess) BatchOut() <-chan models.Batch { return p.server.BatchOut() }
func (p *UDFProcess) Info() (udf.Info, error) { return p.server.Info() }

type UDFSocket struct {
server *udf.Server
Expand Down Expand Up @@ -363,15 +364,15 @@ func (s *UDFSocket) Close() error {
return nil
}

func (s *UDFSocket) Abort(err error) { s.server.Abort(err) }
func (s *UDFSocket) Init(options []*udf.Option) error { return s.server.Init(options) }
func (s *UDFSocket) Snapshot() ([]byte, error) { return s.server.Snapshot() }
func (s *UDFSocket) Restore(snapshot []byte) error { return s.server.Restore(snapshot) }
func (s *UDFSocket) PointIn() chan<- models.Point { return s.server.PointIn() }
func (s *UDFSocket) BatchIn() chan<- models.Batch { return s.server.BatchIn() }
func (s *UDFSocket) PointOut() <-chan models.Point { return s.server.PointOut() }
func (s *UDFSocket) BatchOut() <-chan models.Batch { return s.server.BatchOut() }
func (s *UDFSocket) Info() (udf.Info, error) { return s.server.Info() }
func (s *UDFSocket) Abort(err error) { s.server.Abort(err) }
func (s *UDFSocket) Init(options []*agent.Option) error { return s.server.Init(options) }
func (s *UDFSocket) Snapshot() ([]byte, error) { return s.server.Snapshot() }
func (s *UDFSocket) Restore(snapshot []byte) error { return s.server.Restore(snapshot) }
func (s *UDFSocket) PointIn() chan<- models.Point { return s.server.PointIn() }
func (s *UDFSocket) BatchIn() chan<- models.Batch { return s.server.BatchIn() }
func (s *UDFSocket) PointOut() <-chan models.Point { return s.server.PointOut() }
func (s *UDFSocket) BatchOut() <-chan models.Batch { return s.server.BatchOut() }
func (s *UDFSocket) Info() (udf.Info, error) { return s.server.Info() }

type socket struct {
path string
Expand Down
Loading

0 comments on commit 331c6b6

Please sign in to comment.