Skip to content

Commit

Permalink
implement join.On functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 25, 2016
1 parent ad2c993 commit 55bc51b
Show file tree
Hide file tree
Showing 15 changed files with 794 additions and 53 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ Significant performance improvements have also been added.
In some cases Kapacitor throughput has improved by 4X.


Various improvements to joining features have been implemented.
With #144 you can now join streams with differing group by dimensions.


### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.
- [#144](https://github.com/influxdata/kapacitor/issues/144): Can now join streams with differing dimensions using the join.On property.


### Bugfixes
Expand Down
4 changes: 2 additions & 2 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ func TestServer_EnableTask(t *testing.T) {
dot := `digraph testTaskName {
graph [throughput="0.00 points/s"];
srcstream0 [avg_exec_time="0" ];
srcstream0 [avg_exec_time_ns="0" ];
srcstream0 -> stream1 [processed="0"];
stream1 [avg_exec_time="0" ];
stream1 [avg_exec_time_ns="0" ];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot got\n%s exp\n%s", ti.Dot, dot)
Expand Down
12 changes: 6 additions & 6 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
)

type IntVar interface {
Int() int64
IntValue() int64
}

type FloatVar interface {
Float() float64
FloatValue() float64
}

type StringVar interface {
Expand All @@ -31,7 +31,7 @@ type Int struct {
}

func (v *Int) String() string {
return strconv.FormatInt(v.Int(), 10)
return strconv.FormatInt(v.IntValue(), 10)
}

func (v *Int) Add(delta int64) {
Expand All @@ -42,7 +42,7 @@ func (v *Int) Set(value int64) {
atomic.StoreInt64(&v.i, value)
}

func (v *Int) Int() int64 {
func (v *Int) IntValue() int64 {
return atomic.LoadInt64(&v.i)
}

Expand All @@ -52,10 +52,10 @@ type Float struct {
}

func (v *Float) String() string {
return strconv.FormatFloat(v.Float(), 'g', -1, 64)
return strconv.FormatFloat(v.FloatValue(), 'g', -1, 64)
}

func (v *Float) Float() float64 {
func (v *Float) FloatValue() float64 {
return math.Float64frombits(atomic.LoadUint64(&v.f))
}

Expand Down
8 changes: 4 additions & 4 deletions global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ func GetStatsData() ([]StatsData, error) {
expvar.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case kexpvar.IntVar:
globalData.Values[kv.Key] = v.Int()
globalData.Values[kv.Key] = v.IntValue()
case kexpvar.FloatVar:
globalData.Values[kv.Key] = v.Float()
globalData.Values[kv.Key] = v.FloatValue()
case *kexpvar.Map:
if kv.Key != Product {
panic("unexpected published top level expvar.Map with key " + kv.Key)
Expand Down Expand Up @@ -166,9 +166,9 @@ func GetStatsData() ([]StatsData, error) {
n.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case kexpvar.IntVar:
data.Values[kv.Key] = v.Int()
data.Values[kv.Key] = v.IntValue()
case kexpvar.FloatVar:
data.Values[kv.Key] = v.Float()
data.Values[kv.Key] = v.FloatValue()
default:
panic(fmt.Sprintf("unknown expvar.Var type for stats %T", kv.Value))
}
Expand Down
108 changes: 108 additions & 0 deletions integrations/data/TestStream_JoinOn.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
dbname
rpname
errors,service=cartA,dc=A,rack=0 value=7 0000000001
dbname
rpname
errors,service=login,dc=B,rack=1 value=9 0000000001
dbname
rpname
errors,service=front,dc=A,rack=0 value=2 0000000001
dbname
rpname
errors,service=cartA,dc=B,rack=1 value=9 0000000002
dbname
rpname
errors,service=login,dc=A,rack=0 value=5 0000000003
dbname
rpname
errors,service=front,dc=B,rack=1 value=9 0000000003
dbname
rpname
errors,service=cartA,dc=A,rack=0 value=3 0000000004
dbname
rpname
errors,service=login,dc=B,rack=1 value=9 0000000004
dbname
rpname
errors,service=front,dc=A,rack=0 value=2 0000000005
dbname
rpname
errors,service=login,dc=B,rack=1 value=2 0000000005
dbname
rpname
errors,service=front,dc=A,rack=0 value=5 0000000006
dbname
rpname
errors,service=cartA,dc=B,rack=1 value=9 0000000006
dbname
rpname
errors,service=login,dc=C,rack=0 value=7 0000000006
dbname
rpname
errors,service=front,dc=A,rack=1 value=4 0000000007
dbname
rpname
errors,service=cartA,dc=B,rack=0 value=8 0000000007
dbname
rpname
errors,service=front,dc=A,rack=1 value=6 0000000008
dbname
rpname
errors,service=cartA,dc=B,rack=0 value=6 0000000008
dbname
rpname
errors,service=login,dc=A,rack=1 value=10 0000000009
dbname
rpname
errors,service=front,dc=B,rack=0 value=4 0000000009
dbname
rpname
errors,service=cartA,dc=A,rack=1 value=5 0000000010
dbname
rpname
errors,service=login,dc=B,rack=0 value=3 0000000010
dbname
rpname
errors,service=login,dc=B,rack=0 value=6 0000000011
dbname
rpname
errors,service=cartA,dc=A,rack=1 value=8 0000000012
dbname
rpname
errors,service=cartA,dc=A,rack=0 value=8 0000000012
dbname
rpname
errors,service=front,dc=A,rack=0 value=9 0000000012
dbname
rpname
errors,service=front,dc=A,rack=1 value=9 0000000012
dbname
rpname
errors,service=login,dc=A,rack=1 value=5 0000000012
dbname
rpname
errors,service=login,dc=A,rack=0 value=5 0000000012
dbname
rpname
errors,service=cartA,dc=B,rack=0 value=8 0000000012
dbname
rpname
errors,service=cartA,dc=B,rack=1 value=8 0000000012
dbname
rpname
errors,service=front,dc=B,rack=1 value=9 0000000012
dbname
rpname
errors,service=front,dc=B,rack=0 value=9 0000000012
dbname
rpname
errors,service=login,dc=B,rack=0 value=5 0000000012
dbname
rpname
errors,service=login,dc=B,rack=1 value=5 0000000012
dbname
rpname
errors,service=login,dc=C,rack=1 value=5 0000000012
dbname
rpname
errors,service=login,dc=C,rack=0 value=5 0000000012
Loading

0 comments on commit 55bc51b

Please sign in to comment.