Skip to content

Commit

Permalink
expvar refactored to allow getting and deleting
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 23, 2016
1 parent c39d89f commit ab16b8d
Show file tree
Hide file tree
Showing 19 changed files with 600 additions and 254 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

### Release Notes

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal stats now includes a measurement `node` that contains an averaged execution time for the node, tagged by the task, node, task type and kind of node (i.e. window vs union).
These stats are also available in the DOT output of the Kapacitor show command.


### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.


### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

// Do not add the source batch node to the dot output
// since its not really an edge.
func (s *SourceBatchNode) edot(*bytes.Buffer, time.Duration) {}
func (s *SourceBatchNode) edot(*bytes.Buffer) {}

func (s *SourceBatchNode) collectedCount() (count int64) {
for _, child := range s.children {
Expand Down
32 changes: 5 additions & 27 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package run

import (
"errors"
"expvar"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -45,27 +44,6 @@ import (
const clusterIDFilename = "cluster.id"
const serverIDFilename = "server.id"

var (
//Published vars
cidVar = &expvar.String{}

sidVar = &expvar.String{}

hostVar = &expvar.String{}

productVar = &expvar.String{}

versionVar = &expvar.String{}
)

func init() {
expvar.Publish(kapacitor.ClusterIDVarName, cidVar)
expvar.Publish(kapacitor.ServerIDVarName, sidVar)
expvar.Publish(kapacitor.HostVarName, hostVar)
expvar.Publish(kapacitor.ProductVarName, productVar)
expvar.Publish(kapacitor.VersionVarName, versionVar)
}

// BuildInfo represents the build details for the server code.
type BuildInfo struct {
Version string
Expand Down Expand Up @@ -414,11 +392,11 @@ func (s *Server) Open() error {
}

// Set published vars
cidVar.Set(s.ClusterID)
sidVar.Set(s.ServerID)
hostVar.Set(s.hostname)
productVar.Set(kapacitor.Product)
versionVar.Set(s.buildInfo.Version)
kapacitor.ClusterIDVar.Set(s.ClusterID)
kapacitor.ServerIDVar.Set(s.ServerID)
kapacitor.HostVar.Set(s.hostname)
kapacitor.ProductVar.Set(kapacitor.Product)
kapacitor.VersionVar.Set(s.buildInfo.Version)
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start profiling, if set.
Expand Down
15 changes: 9 additions & 6 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package kapacitor

import (
"errors"
"expvar"
"fmt"
"log"
"strconv"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -36,6 +36,7 @@ type Edge struct {

logger *log.Logger
aborted chan struct{}
statsKey string
statMap *expvar.Map
groupMu sync.RWMutex
groupStats map[models.GroupID]*edgeStat
Expand All @@ -48,10 +49,11 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer
"child": childName,
"type": t.String(),
}
sm := NewStatistics("edges", tags)
key, sm := NewStatistics("edges", tags)
sm.Add(statCollected, 0)
sm.Add(statEmitted, 0)
e := &Edge{
statsKey: key,
statMap: sm,
aborted: make(chan struct{}),
groupStats: make(map[models.GroupID]*edgeStat),
Expand Down Expand Up @@ -125,6 +127,7 @@ func (e *Edge) Close() {
if e.reduce != nil {
close(e.reduce)
}
DeleteStatistics(e.statsKey)
}

// Abort all next and collect calls.
Expand All @@ -151,7 +154,7 @@ func (e *Edge) NextPoint() (p models.Point, ok bool) {
case p, ok = <-e.stream:
if ok {
e.statMap.Add(statEmitted, 1)
e.incEmitted(p)
e.incEmitted(&p)
}
}
return
Expand All @@ -163,7 +166,7 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) {
case b, ok = <-e.batch:
if ok {
e.statMap.Add(statEmitted, 1)
e.incEmitted(b)
e.incEmitted(&b)
}
}
return
Expand All @@ -182,7 +185,7 @@ func (e *Edge) NextMaps() (m *MapResult, ok bool) {

func (e *Edge) CollectPoint(p models.Point) error {
e.statMap.Add(statCollected, 1)
e.incCollected(p)
e.incCollected(&p)
select {
case <-e.aborted:
return ErrAborted
Expand All @@ -193,7 +196,7 @@ func (e *Edge) CollectPoint(p models.Point) error {

func (e *Edge) CollectBatch(b models.Batch) error {
e.statMap.Add(statCollected, 1)
e.incCollected(b)
e.incCollected(&b)
select {
case <-e.aborted:
return ErrAborted
Expand Down
Loading

0 comments on commit ab16b8d

Please sign in to comment.