Skip to content

Commit

Permalink
Feat: handle Delete messages in join (influxdata#2562)
Browse files Browse the repository at this point in the history
* feat: handle delete messages in joinNode

* tests: add tests for join delete messages
  • Loading branch information
docmerlin authored Jun 3, 2021
1 parent fef0d30 commit 478a7d8
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 19 deletions.
4 changes: 2 additions & 2 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (n *idleBarrier) emitBarrier() error {
return err
}
if n.del {
return n.in.Collect(edge.NewDeleteGroupMessage(n.group.ID))
return n.in.Collect(edge.NewDeleteGroupMessage(&n.group))
}
return nil
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (n *periodicBarrier) emitBarrier() error {
}
if n.del {
// Send DeleteGroupMessage into self
return n.in.Collect(edge.NewDeleteGroupMessage(n.group.ID))
return n.in.Collect(edge.NewDeleteGroupMessage(&n.group))
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions edge/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type MultiReceiver interface {
BufferedBatch(src int, batch BufferedBatchMessage) error
Point(src int, p PointMessage) error
Barrier(src int, b BarrierMessage) error
Delete(src int, d DeleteGroupMessage) error
Finish() error
}

Expand Down Expand Up @@ -174,6 +175,10 @@ LOOP:
if err := c.r.Barrier(m.Src, msg); err != nil {
return err
}
case DeleteGroupMessage:
if err := c.r.Delete(m.Src, msg); err != nil {
return err
}
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions edge/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,16 +927,16 @@ func (b *barrierMessage) Time() time.Time {

type DeleteGroupMessage interface {
Message
GroupIDGetter
GroupInfoer
}

type deleteGroupMessage struct {
groupID models.GroupID
info *GroupInfo
}

func NewDeleteGroupMessage(id models.GroupID) DeleteGroupMessage {
func NewDeleteGroupMessage(info *GroupInfo) *deleteGroupMessage {
return &deleteGroupMessage{
groupID: id,
info: info,
}
}

Expand All @@ -945,5 +945,9 @@ func (d *deleteGroupMessage) Type() MessageType {
}

func (d *deleteGroupMessage) GroupID() models.GroupID {
return d.groupID
return d.info.ID
}

func (d *deleteGroupMessage) GroupInfo() *GroupInfo {
return d.info
}
7 changes: 7 additions & 0 deletions group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,16 @@ func (n *GroupByNode) Barrier(b edge.BarrierMessage) error {
}
return edge.Forward(n.outs, b)
}

func (n *GroupByNode) DeleteGroup(d edge.DeleteGroupMessage) error {
n.timer.Start()
n.mu.Lock()
delete(n.groups, d.GroupID())
n.mu.Unlock()
n.timer.Stop()
return edge.Forward(n.outs, d)
}

func (n *GroupByNode) Done() {}

// emit sends all groups before time t to children nodes.
Expand Down
55 changes: 55 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4712,6 +4712,61 @@ errorCounts
testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, true, nil)
}

func TestStream_Delete_Join(t *testing.T) {
var script = `
var errorCounts = stream
|from()
.measurement('cpu')
.groupBy('host')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
|barrier()
.idle(1s)
.delete(TRUE)
var viewCounts = stream
|from()
.measurement('views')
.groupBy('host')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
|barrier()
.idle(1s)
.delete(TRUE)
errorCounts
|join(viewCounts)
.as('errors', 'views')
.streamName('error_view')
.tolerance(2s)
.deleteAll(TRUE)
|eval(lambda: "errors.sum" / "views.sum")
.as('error_percent')
.keep()
|httpOut('TestStream_Delete_Join')
`
er := models.Result{
Series: models.Rows{
{
Name: "error_view",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "error_percent", "errors.sum", "views.sum"},
Values: [][]interface{}{
{time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), 1.0, 18.0, 18.0},
},
},
},
}
testStreamerWithOutput(t, "TestStream_Delete_Join", script, 30*time.Second, er, true, nil)
}

func TestStream_Join_Delimiter(t *testing.T) {

var script = `
Expand Down
21 changes: 18 additions & 3 deletions integrations/testdata/TestBatch_Join.1.brpl
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
{"name":"cpu_usage_idle","points":[{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","points":[{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","points":[{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","points":[
{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},
{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},
{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},
{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},
{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","points":[
{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},
{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},
{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},
{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},
{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","points":[
{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},
{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},
{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},
{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},
{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]}
24 changes: 24 additions & 0 deletions integrations/testdata/TestStream_Delete_Join.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
dbname
rpname
cpu,type=idle,host=serverA value=9,anothervalue=4.2 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=9,anothervalue=4.2 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=9,anothervalue=4.2 0000000021
dbname
rpname
cpu,type=idle,host=serverA value=9,anothervalue=4.2 0000000022
dbname
rpname
views,type=idle,host=serverA value=9,anothervalue=4.2 0000000001
dbname
rpname
views,type=idle,host=serverA value=9,anothervalue=4.2 0000000002
dbname
rpname
views,type=idle,host=serverA value=9,anothervalue=4.2 0000000021
dbname
rpname
views,type=idle,host=serverA value=9,anothervalue=4.2 0000000022
35 changes: 29 additions & 6 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,31 @@ func (n *JoinNode) Point(src int, p edge.PointMessage) error {

func (n *JoinNode) Barrier(src int, b edge.BarrierMessage) error {
g := n.getOrCreateGroup(b.GroupID())
g.Barrier(src, b.Time())
if err := g.Barrier(src, b.Time()); err != nil {
return err
}
return edge.Forward(n.outs, b)
}

// Delete deletes the group from the JoinNode, and resets the Low Marks for from the group from that source.
// if deleteAll is set on the pipeline.Joinnode, then it any delete will delete
func (n *JoinNode) Delete(src int, d edge.DeleteGroupMessage) error {
groupID := d.GroupID()
n.groupsMu.Lock()
delete(n.groups, groupID)
delete(n.matchGroupsBuffer, groupID)
delete(n.specificGroupsBuffer, groupID)
if n.j.DeleteAll {
for x := range n.lowMarks {
delete(n.lowMarks, x)
}
} else {
delete(n.lowMarks, srcGroup{src: src, groupId: groupID})
}
n.groupsMu.Unlock()
return edge.Forward(n.outs, d)
}

func (n *JoinNode) Finish() error {
// No more points are coming signal all groups to finish up.
for _, group := range n.groups {
Expand All @@ -109,6 +130,7 @@ type messageMeta interface {
edge.Message
edge.PointMeta
}

type srcPoint struct {
Src int
Msg messageMeta
Expand All @@ -123,15 +145,17 @@ func (n *JoinNode) doMessage(src int, m messageMeta) error {
} else {
// Just send point on to group, we are not joining on specific dimensions.
group := n.getOrCreateGroup(m.GroupID())
group.Collect(src, m)
if err := group.Collect(src, m); err != nil {
return err
}
}
return nil
}

// The purpose of this method is to match more specific points
// with the less specific points as they arrive.
// The purpose of this method is to match more-specific points
// with the less-specific points as they arrive.
//
// Where 'more specific' means, that a point has more dimensions than the join.on dimensions.
// Where 'more-specific' means, that a point has more dimensions than the join.on dimensions.
func (n *JoinNode) matchPoints(p srcPoint) {
// Specific points may be sent to the joinset without a matching point, but not the other way around.
// This is because the specific points have the needed specific tag data.
Expand Down Expand Up @@ -232,7 +256,6 @@ func (n *JoinNode) matchPoints(p srcPoint) {
n.getOrCreateSpecificGroup(groupId).Enqueue(p)
}
}

} else {
// Cache match point.
n.getOrCreateMatchGroup(groupId).Enqueue(p)
Expand Down
1 change: 0 additions & 1 deletion models/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func ToGroupID(name string, tags map[string]string, dims Dimensions) GroupID {
buf.WriteString(d)
buf.WriteRune('=')
buf.WriteString(tags[d])

}
return GroupID(buf.String())
}
6 changes: 6 additions & 0 deletions pipeline/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
// .fill(0.0)
// // name the resulting stream
// .streamName('error_rate')
// // treat a delete from one side of the join as a delete to all sides
// .deleteAll(TRUE)
// // Both the "value" fields from each parent have been prefixed
// // with the respective names 'errors' and 'requests'.
// |eval(lambda: "errors.value" / "requests.value")
Expand All @@ -70,6 +72,10 @@ type JoinNode struct {
// Can be the empty string.
Delimiter string `json:"delimiter"`

// Deletes both sides of the join regardless what
// side receive the delete message.
DeleteAll bool `json:"deleteAll"`

// The name of this new joined data stream.
// If empty the name of the left parent is used.
StreamName string `json:"streamName"`
Expand Down
1 change: 1 addition & 0 deletions pipeline/tick/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (n *JoinNode) Build(j *pipeline.JoinNode) (ast.Node, error) {
Dot("delimiter", j.Delimiter).
Dot("streamName", j.StreamName).
Dot("tolerance", j.Tolerance).
Dot("deleteAll", j.DeleteAll).
DotNotNil("fill", j.Fill)
return n.prev, n.err
}
7 changes: 5 additions & 2 deletions union.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (n *UnionNode) BufferedBatch(src int, batch edge.BufferedBatchMessage) erro
return n.emitReady(false)
}

func (n *UnionNode) Delete(src int, d edge.DeleteGroupMessage) error {
return edge.Forward(n.outs, d)
}

func (n *UnionNode) Point(src int, p edge.PointMessage) error {
n.timer.Start()
defer n.timer.Stop()
Expand Down Expand Up @@ -135,6 +139,7 @@ func (n *UnionNode) emitReady(drain bool) error {
// Unless we are draining the buffer than we can continue.
return nil
}

// Emit all values that are at or below the mark.
for i = range n.sources {
l := n.sources[i].Len()
Expand All @@ -158,8 +163,6 @@ func (n *UnionNode) emitReady(drain bool) error {
return nil
}

var q = 0

func (n *UnionNode) emit(m edge.Message) error {
n.timer.Pause()
defer n.timer.Resume()
Expand Down

0 comments on commit 478a7d8

Please sign in to comment.