Skip to content

Commit

Permalink
User Defined Functions (UDFs)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 15, 2016
1 parent 00c6e95 commit 33f8274
Show file tree
Hide file tree
Showing 58 changed files with 6,569 additions and 324 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.git
build
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ kapacitor*.rpm
kapacitor*.deb
kapacitor*.tar
kapacitor*.zip
*.pyc
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
# Changelog

## v0.2.5 [unreleased]
## v0.10.0 [unreleased]

### Release Notes

This release marks the next release of Kapacitor.
With this release you can now run your own custom code for processing data within Kapacitor.
See [udf/agent/README.md](https://github.com/influxdata/kapacitor/blob/master/udf/agent/README.md) for more details.

With the addition of UDFs it is now possible to run custom anomaly detection alogrithms suited to your needs.
There are simple examples of how to use UDFs in [udf/agent/examples](https://github.com/influxdata/kapacitor/tree/master/udf/agent/examples/).


The version has jumped significantly so that it is inline with other projects in the TICK stack.
This way you can easily tell which versions of Telegraf, InfluxDB, Chronograf and Kapacitor work together.


### Features
- [#72](https://github.com/influxdata/kapacitor/issues/72): Add support for User Defined Functions (UDFs).
- [#138](https://github.com/influxdata/kapacitor/issues/138): Change over to influxdata github org.
- [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454

Expand Down
34 changes: 33 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,51 @@ RUN apt-get install -y \

RUN gem install fpm

# Install protobuf3
RUN apt-get install -y \
build-essential \
autoconf \
automake \
libtool \
python-setuptools \
curl

ENV PROTO_VERSION 3.0.0-beta-2
# Download and compile protoc
RUN wget https://github.com/google/protobuf/archive/v${PROTO_VERSION}.tar.gz && \
tar xf v${PROTO_VERSION}.tar.gz && \
rm -f v${PROTO_VERSION}.tar.gz && \
cd protobuf-${PROTO_VERSION} && \
./autogen.sh && \
./configure --prefix=/usr && \
make -j $(nproc) && \
make check && \
make install

# Install Python Protobuf3
RUN cd protobuf-${PROTO_VERSION}/python && \
python setup.py install;



# Install go
ENV GO_VERSION 1.5.2
ENV GO_VERSION 1.5.3
ENV GO_ARCH amd64
RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
rm /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz
ENV PATH /usr/local/go/bin:$PATH
ENV GOPATH /gopath
ENV PATH $GOPATH/bin:$PATH
ENV PROJECT_PATH $GOPATH/src/github.com/influxdata/kapacitor
RUN mkdir -p $PROJECT_PATH

WORKDIR $PROJECT_PATH
ENTRYPOINT ["/usr/local/bin/build"]
CMD []

# Get gogo for golang protobuf
RUN go get github.com/gogo/protobuf/protoc-gen-gogo

ADD ./build.py /usr/local/bin/build

7 changes: 4 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -75,9 +76,9 @@ type AlertNode struct {
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err error) {
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *AlertNode, err error) {
an = &AlertNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
a: n,
}
an.node.runF = an.runAlert
Expand Down Expand Up @@ -207,7 +208,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
return
}

func (a *AlertNode) runAlert() error {
func (a *AlertNode) runAlert([]byte) error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand Down
13 changes: 7 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kapacitor
import (
"errors"
"fmt"
"log"
"sync"
"time"

Expand All @@ -19,9 +20,9 @@ type SourceBatchNode struct {
idx int
}

func newSourceBatchNode(et *ExecutingTask, n *pipeline.SourceBatchNode) (*SourceBatchNode, error) {
func newSourceBatchNode(et *ExecutingTask, n *pipeline.SourceBatchNode, l *log.Logger) (*SourceBatchNode, error) {
sn := &SourceBatchNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
s: n,
}
return sn, nil
Expand All @@ -47,7 +48,7 @@ func (s *SourceBatchNode) addParentEdge(in *Edge) {
s.idx++
}

func (s *SourceBatchNode) start() {
func (s *SourceBatchNode) start([]byte) {
}

func (s *SourceBatchNode) Err() error {
Expand Down Expand Up @@ -96,9 +97,9 @@ type BatchNode struct {
closing chan struct{}
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
b: n,
closing: make(chan struct{}),
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func (b *BatchNode) doQuery() error {
}
}

func (b *BatchNode) runBatch() error {
func (b *BatchNode) runBatch([]byte) error {
errC := make(chan error, 1)
go func() {
defer func() {
Expand Down
24 changes: 22 additions & 2 deletions build.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2.7
#!/usr/bin/python2.7 -u

import sys
import os
Expand Down Expand Up @@ -204,7 +204,7 @@ def run_tests(race):
run(get_command)
print "done."
print "Running tests..."
test_command = "go test ./..."
test_command = "go test -v ./..."
if race:
test_command = "go test -race ./..."
code = os.system(test_command)
Expand All @@ -215,6 +215,18 @@ def run_tests(race):
print "Tests Passed"
return True

def run_generate():
print "Running generate..."
command = "go generate ./..."
code = os.system(command)
if code != 0:
print "Generate Failed"
return False
else:
print "Generate Succeeded"
return True


def build(version=None,
branch=None,
commit=None,
Expand Down Expand Up @@ -407,6 +419,7 @@ def main():
update = False
upload = False
test = False
generate = False

for arg in sys.argv[1:]:
if '--outdir' in arg:
Expand Down Expand Up @@ -451,6 +464,9 @@ def main():
elif '--test' in arg:
# Run tests and exit
test = True
elif '--generate' in arg:
# Run go generate ./...
generate = True
else:
print "!! Unknown argument: {}".format(arg)
sys.exit(1)
Expand All @@ -474,6 +490,10 @@ def main():
# TODO(rossmcdonald): Prepare git repo for build (checking out correct branch/commit, etc.)
# prepare(branch=branch, commit=commit)

if generate:
if not run_generate():
return 1

if test:
if not run_tests(race):
return 1
Expand Down
6 changes: 3 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ dependencies:

test:
override:
- ./build.sh --test
- ./build.sh --test --race
- ./build.sh --test --generate
- ./build.sh --test --generate --race


deployment:
release:
tag: /v[0-9]+(\.[0-9]+){2}(-rc[0-9]+)?/
commands:
- ./build.sh --clean --packages --upload --platform=all --arch=all
- ./build.sh --clean --generate --packages --upload --platform=all --arch=all
9 changes: 3 additions & 6 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ func (cmd *Command) Run(args ...string) error {
config.Logging.Level = options.LogLevel
}

// Validate the configuration.
if err := config.Validate(); err != nil {
return fmt.Errorf("%s. To generate a valid configuration file run `kapacitord config > kapacitor.generated.conf`.", err)
}

// Initialize Logging Services
cmd.logService = logging.NewService(config.Logging, cmd.Stdout, cmd.Stderr)
err = cmd.logService.Open()
Expand Down Expand Up @@ -151,7 +146,9 @@ func (cmd *Command) monitorServerErrors() {
for {
select {
case err := <-cmd.Server.Err():
cmd.Logger.Println("E! " + err.Error())
if err != nil {
cmd.Logger.Println("E! " + err.Error())
}
case <-cmd.closing:
return
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"

Expand Down Expand Up @@ -52,6 +53,7 @@ type Config struct {
Alerta alerta.Config `toml:"alerta"`
Reporting reporting.Config `toml:"reporting"`
Stats stats.Config `toml:"stats"`
UDF udf.Config `toml:"udf"`

Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewConfig() *Config {
c.Alerta = alerta.NewConfig()
c.Reporting = reporting.NewConfig()
c.Stats = stats.NewConfig()
c.UDF = udf.NewConfig()

return c
}
Expand Down Expand Up @@ -125,6 +128,10 @@ func (c *Config) Validate() error {
if err != nil {
return err
}
err = c.UDF.Validate()
if err != nil {
return err
}
for _, g := range c.Graphites {
if err := g.Validate(); err != nil {
return fmt.Errorf("invalid graphite config: %v", err)
Expand Down
15 changes: 15 additions & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/wlog"
Expand Down Expand Up @@ -104,6 +105,10 @@ type Server struct {

// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*Server, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("%s. To generate a valid configuration file run `kapacitord config > kapacitor.generated.conf`.", err)
}
l := logService.NewLogger("[srv] ", log.LstdFlags)
s := &Server{
buildInfo: *buildInfo,
Expand All @@ -124,6 +129,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
}

// Append Kapacitor services.
s.appendUDFService(c.UDF)
s.appendSMTPService(c.SMTP)
s.appendHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.Hostname)
Expand Down Expand Up @@ -201,6 +207,7 @@ func (s *Server) appendTaskStoreService(c task_store.Config) {
srv.TaskMaster = s.TaskMaster

s.TaskStore = srv
s.TaskMaster.TaskStore = srv
s.Services = append(s.Services, srv)
}

Expand All @@ -216,6 +223,14 @@ func (s *Server) appendReplayStoreService(c replay.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendUDFService(c udf.Config) {
l := s.LogService.NewLogger("[udf] ", log.LstdFlags)
srv := udf.NewService(c, l)

s.TaskMaster.UDFService = srv
s.Services = append(s.Services, srv)
}

func (s *Server) appendOpsGenieService(c opsgenie.Config) {
if c.Enabled {
l := s.LogService.NewLogger("[opsgenie] ", log.LstdFlags)
Expand Down
Loading

0 comments on commit 33f8274

Please sign in to comment.