Skip to content

Commit

Permalink
add flag for labels
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 24, 2016
1 parent 0a68197 commit e88b6c4
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 98 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

### Release Notes

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal stats now includes a measurement `node` that contains an averaged execution time for the node, tagged by the task, node, task type and kind of node (i.e. window vs union).
Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal statistics includes a new measurement named `node` that contains any stats a node provides, tagged by the task, node, task type and kind of node (i.e. window vs union).
All nodes provide an averaged execution time for the node.
These stats are also available in the DOT output of the Kapacitor show command.

Significant performance improvements have also been added.
In some cases Kapacitor throughput has improved by 4X.


### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

// Do not add the source batch node to the dot output
// since its not really an edge.
func (s *SourceBatchNode) edot(*bytes.Buffer) {}
func (s *SourceBatchNode) edot(*bytes.Buffer, bool) {}

func (s *SourceBatchNode) collectedCount() (count int64) {
for _, child := range s.children {
Expand Down
8 changes: 4 additions & 4 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ func TestServer_EnableTask(t *testing.T) {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := `digraph testTaskName {
graph [label="Throughput: 0.00 points/s"];
graph [throughput="0.00 points/s"];
srcstream0 [label="srcstream0 0"];
srcstream0 -> stream1 [label="0"];
srcstream0 [avg_exec_time="0" ];
srcstream0 -> stream1 [processed="0"];
stream1 [label="stream1 0"];
stream1 [avg_exec_time="0" ];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot got\n%s exp\n%s", ti.Dot, dot)
Expand Down
52 changes: 17 additions & 35 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,25 @@ import (
"sync/atomic"
)

type IntVar interface {
Int() int64
}

type FloatVar interface {
Float() float64
}

type StringVar interface {
StringValue() float64
}

// Int is a 64-bit integer variable that satisfies the expvar.Var interface.
type Int struct {
i int64
}

func (v *Int) String() string {
return strconv.FormatInt(v.Get(), 10)
return strconv.FormatInt(v.Int(), 10)
}

func (v *Int) Add(delta int64) {
Expand All @@ -30,7 +42,7 @@ func (v *Int) Set(value int64) {
atomic.StoreInt64(&v.i, value)
}

func (v *Int) Get() int64 {
func (v *Int) Int() int64 {
return atomic.LoadInt64(&v.i)
}

Expand All @@ -40,10 +52,10 @@ type Float struct {
}

func (v *Float) String() string {
return strconv.FormatFloat(v.Get(), 'g', -1, 64)
return strconv.FormatFloat(v.Float(), 'g', -1, 64)
}

func (v *Float) Get() float64 {
func (v *Float) Float() float64 {
return math.Float64frombits(atomic.LoadUint64(&v.f))
}

Expand All @@ -65,36 +77,6 @@ func (v *Float) Set(value float64) {
atomic.StoreUint64(&v.f, math.Float64bits(value))
}

// MaxFloat is a 64-bit float variable that satisfies the expvar.Var interface.
// When setting a value it will only be set if it is greater than the current value.
type MaxFloat struct {
f uint64
}

func (v *MaxFloat) String() string {
return strconv.FormatFloat(v.Get(), 'g', -1, 64)
}

func (v *MaxFloat) Get() float64 {
return math.Float64frombits(atomic.LoadUint64(&v.f))
}

// Set sets v to value.
func (v *MaxFloat) Set(value float64) {
nxtBits := math.Float64bits(value)
for {
curBits := atomic.LoadUint64(&v.f)
cur := math.Float64frombits(curBits)
if value > cur {
if atomic.CompareAndSwapUint64(&v.f, curBits, nxtBits) {
return
}
} else {
return
}
}
}

// Map is a string-to-expvar.Var map variable that satisfies the expvar.Var interface.
type Map struct {
mu sync.RWMutex
Expand Down Expand Up @@ -238,7 +220,7 @@ func (v *String) Set(value string) {
v.s = value
}

func (v *String) Get() string {
func (v *String) StringValue() string {
v.mu.RLock()
defer v.mu.RUnlock()
return v.s
Expand Down
22 changes: 10 additions & 12 deletions global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func GetStatsData() ([]StatsData, error) {
// Get all global statistics
expvar.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case *kexpvar.Int:
globalData.Values[kv.Key] = v.Get()
case *kexpvar.Float:
globalData.Values[kv.Key] = v.Get()
case kexpvar.IntVar:
globalData.Values[kv.Key] = v.Int()
case kexpvar.FloatVar:
globalData.Values[kv.Key] = v.Float()
case *kexpvar.Map:
if kv.Key != Product {
panic("unexpected published top level expvar.Map with key " + kv.Key)
Expand All @@ -153,24 +153,22 @@ func GetStatsData() ([]StatsData, error) {
v.Do(func(subKV expvar.KeyValue) {
switch subKV.Key {
case "name":
data.Name = subKV.Value.(*kexpvar.String).Get()
data.Name = subKV.Value.(*kexpvar.String).StringValue()
case "tags":
// string-string tags map.
n := subKV.Value.(*kexpvar.Map)
n.Do(func(t expvar.KeyValue) {
data.Tags[t.Key] = t.Value.(*kexpvar.String).Get()
data.Tags[t.Key] = t.Value.(*kexpvar.String).StringValue()
})
case "values":
// string-interface map.
n := subKV.Value.(*kexpvar.Map)
n.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case *kexpvar.Int:
data.Values[kv.Key] = v.Get()
case *kexpvar.Float:
data.Values[kv.Key] = v.Get()
case *kexpvar.MaxFloat:
data.Values[kv.Key] = v.Get()
case kexpvar.IntVar:
data.Values[kv.Key] = v.Int()
case kexpvar.FloatVar:
data.Values[kv.Key] = v.Float()
default:
panic(fmt.Sprintf("unknown expvar.Var type for stats %T", kv.Value))
}
Expand Down
2 changes: 1 addition & 1 deletion influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.L
i: n,
wb: newWriteBuffer(int(n.Buffer), n.FlushInterval),
}
l.Println("D! fi: ", in.wb.flushInterval)
in.node.runF = in.runOut
in.node.stopF = in.stopOut
in.wb.i = in
return in, nil
}

func (i *InfluxDBOutNode) runOut([]byte) error {
i.statMap.Add(statsInfluxDBPointsWritten, 0)
// Start the write buffer
i.wb.start()

Expand Down
107 changes: 88 additions & 19 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package kapacitor

import (
"bytes"
"expvar"
"fmt"
"log"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/kapacitor/expvar"
kexpvar "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/timer"
)

const (
averageExecTimeVarName = "avg_execution_time"
statAverageExecTime = "avg_exec_time"
)

// A node that can be in an executor.
Expand Down Expand Up @@ -45,7 +47,7 @@ type Node interface {
abortParentEdges()

// executing dot
edot(buf *bytes.Buffer)
edot(buf *bytes.Buffer, labels bool)

nodeStatsByGroup() map[models.GroupID]nodeStats

Expand All @@ -69,8 +71,8 @@ type node struct {
logger *log.Logger
timer timer.Timer
statsKey string
statMap *expvar.Map
avgExecVar *expvar.MaxFloat
statMap *kexpvar.Map
avgExecVar *MaxDuration
}

func (n *node) addParentEdge(e *Edge) {
Expand All @@ -91,8 +93,8 @@ func (n *node) start(snapshot []byte) {
"kind": n.Desc(),
}
n.statsKey, n.statMap = NewStatistics("nodes", tags)
n.avgExecVar = &expvar.MaxFloat{}
n.statMap.Set(averageExecTimeVarName, n.avgExecVar)
n.avgExecVar = &MaxDuration{}
n.statMap.Set(statAverageExecTime, n.avgExecVar)
n.timer = n.et.tm.TimingService.NewTimer(n.avgExecVar)
n.errCh = make(chan error, 1)
go func() {
Expand Down Expand Up @@ -182,22 +184,60 @@ func (n *node) closeChildEdges() {
}
}

func (n *node) edot(buf *bytes.Buffer) {
buf.Write([]byte(
fmt.Sprintf("\n%s [label=\"%s %v\"];\n",
n.Name(),
n.Name(),
time.Duration(n.avgExecVar.Get()),
),
))
for i, c := range n.children {
func (n *node) edot(buf *bytes.Buffer, labels bool) {
if labels {
// Print all stats on node.
buf.Write([]byte(
fmt.Sprintf("%s -> %s [label=\"%d\"];\n",
fmt.Sprintf("\n%s [label=\"%s ",
n.Name(),
n.Name(),
c.Name(),
n.outs[i].collectedCount(),
),
))
n.statMap.Do(func(kv expvar.KeyValue) {
buf.Write([]byte(
fmt.Sprintf("%s=%s ",
kv.Key,
kv.Value.String(),
),
))
})
buf.Write([]byte("\"];\n"))

for i, c := range n.children {
buf.Write([]byte(
fmt.Sprintf("%s -> %s [label=\"%d\"];\n",
n.Name(),
c.Name(),
n.outs[i].collectedCount(),
),
))
}

} else {
// Print all stats on node.
buf.Write([]byte(
fmt.Sprintf("\n%s [",
n.Name(),
),
))
n.statMap.Do(func(kv expvar.KeyValue) {
buf.Write([]byte(
fmt.Sprintf("%s=\"%s\" ",
kv.Key,
kv.Value.String(),
),
))
})
buf.Write([]byte("];\n"))
for i, c := range n.children {
buf.Write([]byte(
fmt.Sprintf("%s -> %s [processed=\"%d\"];\n",
n.Name(),
c.Name(),
n.outs[i].collectedCount(),
),
))
}
}
}

Expand Down Expand Up @@ -233,3 +273,32 @@ func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) {
}
return
}

// MaxDuration is a 64-bit int variable representing a duration in nanoseconds,that satisfies the expvar.Var interface.
// When setting a value it will only be set if it is greater than the current value.
type MaxDuration struct {
d int64
}

func (v *MaxDuration) String() string {
return time.Duration(v.Int()).String()
}

func (v *MaxDuration) Int() int64 {
return atomic.LoadInt64(&v.d)
}

// Set sets value if it is greater than current value.
func (v *MaxDuration) Set(value float64) {
next := int64(value)
for {
cur := v.Int()
if next > cur {
if atomic.CompareAndSwapInt64(&v.d, cur, next) {
return
}
} else {
return
}
}
}
14 changes: 7 additions & 7 deletions services/reporting/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func (s *Service) Open() error {
}

// Populate published vars
s.clusterID = kapacitor.ClusterIDVar.Get()
s.serverID = kapacitor.ServerIDVar.Get()
s.hostname = kapacitor.HostVar.Get()
s.version = kapacitor.VersionVar.Get()
s.clusterID = kapacitor.ClusterIDVar.StringValue()
s.serverID = kapacitor.ServerIDVar.StringValue()
s.hostname = kapacitor.HostVar.StringValue()
s.version = kapacitor.VersionVar.StringValue()
s.product = kapacitor.Product

// Populate anonymous tags
Expand Down Expand Up @@ -104,9 +104,9 @@ func (s *Service) sendUsageReport() error {
// Add values
data.Values[kapacitor.ClusterIDVarName] = s.clusterID
data.Values[kapacitor.ServerIDVarName] = s.serverID
data.Values[kapacitor.NumTasksVarName] = kapacitor.NumTasksVar.Get()
data.Values[kapacitor.NumEnabledTasksVarName] = kapacitor.NumEnabledTasksVar.Get()
data.Values[kapacitor.NumSubscriptionsVarName] = kapacitor.NumSubscriptionsVar.Get()
data.Values[kapacitor.NumTasksVarName] = kapacitor.NumTasksVar.Int()
data.Values[kapacitor.NumEnabledTasksVarName] = kapacitor.NumEnabledTasksVar.Int()
data.Values[kapacitor.NumSubscriptionsVarName] = kapacitor.NumSubscriptionsVar.Int()
data.Values[kapacitor.UptimeVarName] = kapacitor.Uptime().Seconds()

usage := client.Usage{
Expand Down
Loading

0 comments on commit e88b6c4

Please sign in to comment.