Skip to content

Commit

Permalink
influxdb v2 buffered writes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 22, 2016
1 parent 246f111 commit c39d89f
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.


### Bugfixes
Expand Down
6 changes: 3 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gorhill/cronexpr"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/influxql"
)

Expand Down Expand Up @@ -274,8 +274,8 @@ func (b *BatchNode) doQuery() error {
return err
}

if resp.Err != nil {
return resp.Err
if err := resp.Error(); err != nil {
return err
}

// Collect batches
Expand Down
6 changes: 5 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/wlog"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
)

// Server represents a test wrapper for run.Server.
Expand Down Expand Up @@ -382,6 +382,10 @@ type InfluxDB struct {

func NewInfluxDB(callback queryFunc) *InfluxDB {
handler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/ping" {
w.WriteHeader(http.StatusNoContent)
return
}
q := r.URL.Query().Get("q")
res := callback(q)
enc := json.NewEncoder(w)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/toml"
Expand Down
186 changes: 157 additions & 29 deletions influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,36 @@ package kapacitor

import (
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
)

type InfluxDBOutNode struct {
node
i *pipeline.InfluxDBOutNode
conn *client.Client
i *pipeline.InfluxDBOutNode
wb *writeBuffer
}

func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.Logger) (*InfluxDBOutNode, error) {
in := &InfluxDBOutNode{
node: node{Node: n, et: et, logger: l},
i: n,
wb: newWriteBuffer(int(n.Buffer), n.FlushInterval),
}
in.node.runF = in.runOut
in.node.stopF = in.stopOut
in.wb.i = in
return in, nil
}

func (i *InfluxDBOutNode) runOut([]byte) error {
// Start the write buffer
i.wb.start()

switch i.Wants() {
case pipeline.StreamEdge:
for p, ok := i.ins[0].NextPoint(); ok; p, ok = i.ins[0].NextPoint() {
Expand Down Expand Up @@ -53,6 +61,11 @@ func (i *InfluxDBOutNode) runOut([]byte) error {
return nil
}

func (i *InfluxDBOutNode) stopOut() {
i.wb.flush()
i.wb.abort()
}

func (i *InfluxDBOutNode) write(db, rp string, batch models.Batch) error {
if i.i.Database != "" {
db = i.i.Database
Expand All @@ -65,39 +78,154 @@ func (i *InfluxDBOutNode) write(db, rp string, batch models.Batch) error {
name = batch.Name
}

if i.conn == nil {
var err error
i.conn, err = i.et.tm.InfluxDBService.NewClient()
var err error
points := make([]*client.Point, len(batch.Points))
for j, p := range batch.Points {
var tags models.Tags
if len(i.i.Tags) > 0 {
tags = make(models.Tags, len(p.Tags)+len(i.i.Tags))
for k, v := range p.Tags {
tags[k] = v
}
for k, v := range i.i.Tags {
tags[k] = v
}
} else {
tags = p.Tags
}

points[j], err = client.NewPoint(
name,
tags,
p.Fields,
p.Time,
)
if err != nil {
return err
}
}
points := make([]client.Point, len(batch.Points))
for j, p := range batch.Points {
points[j] = client.Point{
Measurement: name,
Tags: p.Tags,
Time: p.Time,
Fields: p.Fields,
Precision: i.i.Precision,
}
}
tags := make(map[string]string, len(i.i.Tags)+len(batch.Tags))
for k, v := range batch.Tags {
tags[k] = v
}
for k, v := range i.i.Tags {
tags[k] = v
}

bp := client.BatchPoints{
Points: points,
bpc := client.BatchPointsConfig{
Database: db,
RetentionPolicy: rp,
WriteConsistency: i.i.WriteConsistency,
Tags: tags,
Precision: i.i.Precision,
}
_, err := i.conn.Write(bp)
return err
i.wb.enqueue(bpc, points)
return nil
}

type writeBuffer struct {
size int
flushInterval time.Duration
errC chan error
queue chan queueEntry
buffer map[client.BatchPointsConfig]client.BatchPoints

flushing chan struct{}
flushed chan struct{}

stopping chan struct{}
wg sync.WaitGroup
conn client.Client

i *InfluxDBOutNode
}

type queueEntry struct {
bpc client.BatchPointsConfig
points []*client.Point
}

func newWriteBuffer(size int, flushInterval time.Duration) *writeBuffer {
return &writeBuffer{
size: size,
flushInterval: flushInterval,
flushing: make(chan struct{}),
flushed: make(chan struct{}),
queue: make(chan queueEntry),
buffer: make(map[client.BatchPointsConfig]client.BatchPoints),
stopping: make(chan struct{}),
}
}

func (w *writeBuffer) enqueue(bpc client.BatchPointsConfig, points []*client.Point) {
qe := queueEntry{
bpc: bpc,
points: points,
}
w.queue <- qe
}

func (w *writeBuffer) start() {
w.wg.Add(1)
go w.run()
}

func (w *writeBuffer) flush() {
w.flushing <- struct{}{}
<-w.flushed
}

func (w *writeBuffer) abort() {
close(w.stopping)
w.wg.Wait()
}

func (w *writeBuffer) run() {
defer w.wg.Done()
var err error
for {
select {
case qe := <-w.queue:
// Read incoming points off queue
bp, ok := w.buffer[qe.bpc]
if !ok {
bp, err = client.NewBatchPoints(qe.bpc)
if err != nil {
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
break
}
w.buffer[qe.bpc] = bp
}
bp.AddPoints(qe.points)
// Check if we hit buffer size
if len(bp.Points()) >= w.size {
err = w.write(bp)
if err != nil {
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, qe.bpc)
}
case <-w.flushing:
// Explicit flush called
w.writeAll()
w.flushed <- struct{}{}
case <-time.After(w.flushInterval):
// Flush all points after timeout
w.writeAll()
case <-w.stopping:
return
}
}
}

func (w *writeBuffer) writeAll() {
for bpc, bp := range w.buffer {
err := w.write(bp)
if err != nil {
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, bpc)
}
}

func (w *writeBuffer) write(bp client.BatchPoints) error {
var err error
if w.conn == nil {
w.conn, err = w.i.et.tm.InfluxDBService.NewClient()
if err != nil {
return err
}
}
return w.conn.Write(bp)
}
11 changes: 4 additions & 7 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"time"

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/wlog"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/influxql"
)

Expand All @@ -28,11 +27,9 @@ func NewMockInfluxDBService(h http.Handler) *MockInfluxDBService {
}
}

func (m *MockInfluxDBService) NewClient() (*client.Client, error) {
u, _ := url.Parse(m.ts.URL)
return client.NewClient(client.Config{
URL: *u,
Precision: "s",
func (m *MockInfluxDBService) NewClient() (client.Client, error) {
return client.NewHTTPClient(client.HTTPConfig{
Addr: m.ts.URL,
})

}
Expand Down
4 changes: 4 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,10 @@ stream
var precision string

influxdb := NewMockInfluxDBService(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/ping" {
w.WriteHeader(http.StatusNoContent)
return
}
//Respond
var data client.Response
w.WriteHeader(http.StatusOK)
Expand Down
7 changes: 4 additions & 3 deletions models/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package models

import (
"encoding/json"
"errors"
"fmt"
"sort"
"time"

"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/models"
)

Expand Down Expand Up @@ -99,8 +100,8 @@ func BatchToRow(b Batch) (row *models.Row) {
}

func ResultToBatches(res client.Result) ([]Batch, error) {
if res.Err != nil {
return nil, res.Err
if res.Err != "" {
return nil, errors.New(res.Err)
}
batches := make([]Batch, len(res.Series))
for i, series := range res.Series {
Expand Down
15 changes: 14 additions & 1 deletion pipeline/influxdb_out.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package pipeline

import "time"

const DefaultBufferSize = 1000
const DefaultFlushInterval = time.Second * 10

// Writes the data to InfluxDB as it is received.
//
// Example:
Expand Down Expand Up @@ -27,6 +32,12 @@ type InfluxDBOutNode struct {
WriteConsistency string
// The precision to use when writing the data.
Precision string
// Number of points to buffer when writing to InfluxDB.
// Default: 1000
Buffer int64
// Write points to InfluxDB after interval even if buffer is not full.
// Default: 10s
FlushInterval time.Duration
// Static set of tags to add to all data points before writing them.
//tick:ignore
Tags map[string]string
Expand All @@ -39,7 +50,9 @@ func newInfluxDBOutNode(wants EdgeType) *InfluxDBOutNode {
wants: wants,
provides: NoEdge,
},
Tags: make(map[string]string),
Tags: make(map[string]string),
Buffer: DefaultBufferSize,
FlushInterval: DefaultFlushInterval,
}
}

Expand Down
Loading

0 comments on commit c39d89f

Please sign in to comment.