Skip to content

Commit

Permalink
adds reporting service for sending stats to Enterprise
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 5, 2015
1 parent d3ce6b2 commit 9c163b6
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 93 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.*.swp
dist/*
/build/*
/kapacitor.conf
kapacitor_linux*
kapacitord_linux*
*~
Expand Down
86 changes: 47 additions & 39 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,20 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

type BatchNode struct {
node
b *pipeline.BatchNode
query *Query
ticker ticker
wg sync.WaitGroup
errC chan error
b *pipeline.BatchNode
query *Query
ticker ticker
wg sync.WaitGroup
errC chan error
closing chan struct{}
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et},
b: n,
errC: make(chan error, 1),
node: node{Node: n, et: et},
b: n,
errC: make(chan error, 1),
closing: make(chan struct{}),
}
bn.node.runF = bn.runBatch
bn.node.stopF = bn.stopBatch
Expand Down Expand Up @@ -206,7 +208,7 @@ func (b *BatchNode) Queries(start, stop time.Time) []string {
return queries
}

// Query InfluxDB return Edge with data
// Query InfluxDB and collect batches on batch collector.
func (b *BatchNode) doQuery(batch BatchCollector) error {
defer batch.Close()
defer b.wg.Done()
Expand All @@ -216,52 +218,58 @@ func (b *BatchNode) doQuery(batch BatchCollector) error {
}

tickC := b.ticker.Start()
for now := range tickC {

// Update times for query
stop := now.Add(-1 * b.b.Offset)
b.query.Start(stop.Add(-1 * b.b.Period))
b.query.Stop(stop)

b.logger.Println("D! starting next batch query:", b.query.String())
for {
select {
case <-b.closing:
return nil
case now := <-tickC:

// Connect
con, err := b.et.tm.InfluxDBService.NewClient()
if err != nil {
return err
}
q := client.Query{
Command: b.query.String(),
}
// Update times for query
stop := now.Add(-1 * b.b.Offset)
b.query.Start(stop.Add(-1 * b.b.Period))
b.query.Stop(stop)

// Execute query
resp, err := con.Query(q)
if err != nil {
return err
}
b.logger.Println("D! starting next batch query:", b.query.String())

if resp.Err != nil {
return resp.Err
}
// Connect
con, err := b.et.tm.InfluxDBService.NewClient()
if err != nil {
return err
}
q := client.Query{
Command: b.query.String(),
}

// Collect batches
for _, res := range resp.Results {
batches, err := models.ResultToBatches(res)
// Execute query
resp, err := con.Query(q)
if err != nil {
return err
}
for _, bch := range batches {
batch.CollectBatch(bch)

if resp.Err != nil {
return resp.Err
}

// Collect batches
for _, res := range resp.Results {
batches, err := models.ResultToBatches(res)
if err != nil {
return err
}
for _, bch := range batches {
batch.CollectBatch(bch)
}
}
}
return errors.New("batch ticker schedule stopped")
}
return errors.New("batch ticker schedule stopped")
}

func (b *BatchNode) stopBatch() {
if b.ticker != nil {
b.ticker.Stop()
}
close(b.closing)
b.wg.Wait()
}

Expand Down
14 changes: 10 additions & 4 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdb/kapacitor/services/influxdb"
"github.com/influxdb/kapacitor/services/logging"
"github.com/influxdb/kapacitor/services/replay"
"github.com/influxdb/kapacitor/services/reporting"
"github.com/influxdb/kapacitor/services/smtp"
"github.com/influxdb/kapacitor/services/task_store"
"github.com/influxdb/kapacitor/services/udp"
Expand All @@ -29,18 +30,18 @@ type Config struct {
Replay replay.Config `toml:"replay"`
Task task_store.Config `toml:"task"`
InfluxDB influxdb.Config `toml:"influxdb"`
Logging logging.Config
Logging logging.Config `toml:"logging"`

Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPs []udp.Config `toml:"udp"`
SMTP smtp.Config `toml:"smtp"`
Reporting reporting.Config `toml:"reporting"`

Hostname string `toml:"hostname"`

// Server reporting
ReportingDisabled bool `toml:"reporting-disabled"`
DataDir string `toml:"data_dir"`
Token string `toml:"token"`
}

// NewConfig returns an instance of Config with reasonable defaults.
Expand All @@ -57,6 +58,7 @@ func NewConfig() *Config {
c.Collectd = collectd.NewConfig()
c.OpenTSDB = opentsdb.NewConfig()
c.SMTP = smtp.NewConfig()
c.Reporting = reporting.NewConfig()

return c
}
Expand All @@ -79,6 +81,7 @@ func NewDemoConfig() (*Config, error) {
c.Replay.Dir = filepath.Join(homeDir, ".kapacitor", c.Replay.Dir)
c.Task.Dir = filepath.Join(homeDir, ".kapacitor", c.Task.Dir)

c.DataDir = filepath.Join(homeDir, ".kapacitor", c.DataDir)
return c, nil
}

Expand All @@ -87,6 +90,9 @@ func (c *Config) Validate() error {
if c.Hostname == "" {
return fmt.Errorf("must configure valid hostname")
}
if c.DataDir == "" {
return fmt.Errorf("must configure valid data dir")
}
err := c.Replay.Validate()
if err != nil {
return err
Expand Down
Loading

0 comments on commit 9c163b6

Please sign in to comment.