Skip to content

Commit

Permalink
Add UDF socket support.
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 8, 2016
1 parent 1edf2a9 commit 20fda14
Show file tree
Hide file tree
Showing 36 changed files with 2,817 additions and 1,553 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,31 @@

### Release Notes

UDF can now be managed externally to Kapacitor via Unix sockets.
A process or container can be launched independent of Kapacitor exposing a socket.
On startup Kapacitor will connect to the socket and begin communication.

Example UDF config for a socket based UDF.

```
[udf]
[udf.functions]
[udf.functions.myCustomUDF]
socket = "/path/to/socket"
timeout = "10s"
```

### Features

- [#360](https://github.com/influxdata/kapacitor/pull/360): Forking tasks by measurement in order to improve performance
- [#386](https://github.com/influxdata/kapacitor/issues/386): Adds official Go HTTP client package.
- [#399](https://github.com/influxdata/kapacitor/issues/399): Allow disabling of subscriptions.
- [#417](https://github.com/influxdata/kapacitor/issues/417): UDFs can be connected over a Unix socket. This enables UDFs from across Docker containers.

### Bugfixes

- [#441](https://github.com/influxdata/kapacitor/issues/441): Fix panic in UDF code.

## v0.12.0 [2016-04-04]

### Release Notes
Expand Down
2 changes: 1 addition & 1 deletion build.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def package_scripts(build_root):

def run_generate():
print "Running generate..."
run("go get github.com/gogo/protobuf/protoc-gen-gogo")
run("go get github.com/golang/protobuf/protoc-gen-go")
run("go get github.com/benbjohnson/tmpl")
run("go generate ./...")
print "Generate succeeded."
Expand Down
4 changes: 3 additions & 1 deletion client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/influxql"
)

const DefaultUserAgent = "KapacitorClient"
Expand Down Expand Up @@ -321,7 +323,7 @@ func (c *Client) RecordStream(name string, duration time.Duration) (string, erro
v := url.Values{}
v.Add("type", "stream")
v.Add("name", name)
v.Add("duration", duration.String())
v.Add("duration", influxql.FormatDuration(duration))

return c.doRecord(v)
}
Expand Down
4 changes: 2 additions & 2 deletions client/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func Test_Record(t *testing.T) {
checkRequest: func(r *http.Request) bool {
return r.URL.Query().Get("type") == "stream" &&
r.URL.Query().Get("name") == "taskname" &&
r.URL.Query().Get("duration") == "1m0s"
r.URL.Query().Get("duration") == "1m"
},
},
{
Expand Down Expand Up @@ -513,7 +513,7 @@ func Test_Record(t *testing.T) {

rid, err := tc.fnc(c)
if err != nil {
t.Fatal(err)
t.Fatal(tc.name, err)
}
if exp, got := "rid1", rid; got != exp {
t.Errorf("unexpected recording id for test %s: got: %s exp: %s", tc.name, got, exp)
Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,10 @@ func doRecord(args []string) error {
default:
return fmt.Errorf("Unknown record type %q, expected 'stream', 'batch' or 'query'", args[0])
}

_, err = cli.Recording(rid)
if err != nil {
return err
}
fmt.Println(rid)
return nil
}
Expand Down
157 changes: 157 additions & 0 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,163 @@ test,group=b value=0 0000000011
}
}

func TestServer_UDFStreamAgentsSocket(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tdir)

agents := []struct {
startFunc func() *exec.Cmd
config udf.FunctionConfig
}{
// Go
{
startFunc: func() *exec.Cmd {
cmd := exec.Command(
"go",
"build",
"-o",
filepath.Join(tdir, "echo"),
filepath.Join(udfDir, "agent/examples/echo/echo.go"),
)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatal(string(out))
}
cmd = exec.Command(
filepath.Join(tdir, "echo"),
"-socket",
filepath.Join(tdir, "echo.go.sock"),
)
cmd.Stderr = os.Stderr
return cmd
},
config: udf.FunctionConfig{
Socket: filepath.Join(tdir, "echo.go.sock"),
Timeout: toml.Duration(time.Minute),
},
},
// Python
{
startFunc: func() *exec.Cmd {
cmd := exec.Command(
"python2",
"-u",
filepath.Join(udfDir, "agent/examples/echo/echo.py"),
filepath.Join(tdir, "echo.py.sock"),
)
cmd.Stderr = os.Stderr
env := os.Environ()
env = append(env, fmt.Sprintf(
"%s=%s",
"PYTHONPATH",
strings.Join(
[]string{filepath.Join(udfDir, "agent/py"), os.Getenv("PYTHONPATH")},
string(filepath.ListSeparator),
),
))
cmd.Env = env
return cmd
},
config: udf.FunctionConfig{
Socket: filepath.Join(tdir, "echo.py.sock"),
Timeout: toml.Duration(time.Minute),
},
},
}
for _, agent := range agents {
cmd := agent.startFunc()
cmd.Start()
defer cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatal(err)
}
c := NewConfig()
c.UDF.Functions = map[string]udf.FunctionConfig{
"echo": agent.config,
}
testStreamAgentSocket(t, c)
}
}

func testStreamAgentSocket(t *testing.T, c *run.Config) {
s := NewServer(c)
err := s.Open()
if err != nil {
t.Fatal(err)
}
defer s.Close()
cli := Client(s)

name := "testUDFTask"
ttype := "stream"
dbrps := []client.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}}
tick := `stream
|from()
.measurement('test')
.groupBy('group')
@echo()
|window()
.period(10s)
.every(10s)
|count('value')
|httpOut('count')
`

err = cli.Define(name, ttype, dbrps, strings.NewReader(tick), false)
if err != nil {
t.Fatal(err)
}

err = cli.Enable(name)
if err != nil {
t.Fatal(err)
}

endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}

points := `test,group=a value=1 0000000000
test,group=a value=1 0000000001
test,group=a value=1 0000000002
test,group=a value=1 0000000003
test,group=a value=1 0000000004
test,group=a value=1 0000000005
test,group=a value=1 0000000006
test,group=a value=1 0000000007
test,group=a value=1 0000000008
test,group=a value=1 0000000009
test,group=a value=0 0000000010
test,group=a value=0 0000000011
`
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","count"],"values":[["1970-01-01T00:00:10Z",10]]}],"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}
}

func TestServer_UDFBatchAgents(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
Expand Down
45 changes: 0 additions & 45 deletions command/command.go

This file was deleted.

Loading

0 comments on commit 20fda14

Please sign in to comment.