Skip to content

Commit

Permalink
Merged pull request influxdata#1566 from sputnik13/window_force_emit
Browse files Browse the repository at this point in the history
Add BarrierNode to emit BarrierMessage periodically
  • Loading branch information
nathanielc committed Nov 28, 2017
2 parents a3fb201 + c0f7d08 commit a2a85bf
Show file tree
Hide file tree
Showing 13 changed files with 1,535 additions and 162 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## unreleased
## Unreleased
- [#1566](https://github.com/influxdata/kapacitor/pull/1566): Add BarrierNode to emit BarrierMessage periodically

### Features

Expand Down
277 changes: 277 additions & 0 deletions barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package kapacitor

import (
"errors"
"time"

"sync"
"sync/atomic"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

type BarrierNode struct {
node
b *pipeline.BarrierNode
barrierStopper map[models.GroupID]func()
}

// Create a new BarrierNode, which emits a barrier if data traffic has been idle for the configured amount of time.
func newBarrierNode(et *ExecutingTask, n *pipeline.BarrierNode, d NodeDiagnostic) (*BarrierNode, error) {
if n.Idle == 0 && n.Period == 0 {
return nil, errors.New("barrier node must have either a non zero idle or a non zero period")
}
bn := &BarrierNode{
node: node{Node: n, et: et, diag: d},
b: n,
barrierStopper: map[models.GroupID]func(){},
}
bn.node.runF = bn.runBarrierEmitter
return bn, nil
}

func (n *BarrierNode) runBarrierEmitter([]byte) error {
defer n.stopBarrierEmitter()
consumer := edge.NewGroupedConsumer(n.ins[0], n)
n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
return consumer.Consume()
}

func (n *BarrierNode) stopBarrierEmitter() {
for _, stopF := range n.barrierStopper {
stopF()
}
}

func (n *BarrierNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
r, stopF, err := n.newBarrier(group, first)
if err != nil {
return nil, err
}
n.barrierStopper[group.ID] = stopF
return edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, r),
), nil
}

func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (edge.ForwardReceiver, func(), error) {
switch {
case n.b.Idle != 0:
idleBarrier := newIdleBarrier(
first.Name(),
group,
n.b.Idle,
n.outs,
)
return idleBarrier, idleBarrier.Stop, nil
case n.b.Period != 0:
periodicBarrier := newPeriodicBarrier(
first.Name(),
group,
n.b.Period,
n.outs,
)
return periodicBarrier, periodicBarrier.Stop, nil
default:
return nil, nil, errors.New("unreachable code, barrier node should have non-zero idle or non-zero period")
}
}

type idleBarrier struct {
name string
group edge.GroupInfo

idle time.Duration
lastT atomic.Value
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
resetTimerC chan struct{}
}

func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier {
r := &idleBarrier{
name: name,
group: group,
idle: idle,
lastT: atomic.Value{},
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
}

r.Init()

return r
}

func (n *idleBarrier) Init() {
n.lastT.Store(time.Time{})
n.wg.Add(1)

go n.idleHandler()
}

func (n *idleBarrier) Stop() {
close(n.stopC)
n.wg.Wait()
}

func (n *idleBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *idleBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}
func (n *idleBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *idleBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}
func (n *idleBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) {
if m.GroupID() == n.group.ID {
n.Stop()
}
return m, nil
}

func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
n.resetTimer()
return m, nil
}
return nil, nil
}

func (n *idleBarrier) resetTimer() {
n.resetTimerC <- struct{}{}
}

func (n *idleBarrier) emitBarrier() error {
nowT := time.Now().UTC()
n.lastT.Store(nowT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT))
}

func (n *idleBarrier) idleHandler() {
defer n.wg.Done()
idleTimer := time.NewTimer(n.idle)
for {
select {
case <-n.resetTimerC:
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(n.idle)
case <-idleTimer.C:
n.emitBarrier()
idleTimer.Reset(n.idle)
case <-n.stopC:
idleTimer.Stop()
return
}
}
}

type periodicBarrier struct {
name string
group edge.GroupInfo

lastT atomic.Value
ticker *time.Ticker
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
}

func newPeriodicBarrier(name string, group edge.GroupInfo, period time.Duration, outs []edge.StatsEdge) *periodicBarrier {
r := &periodicBarrier{
name: name,
group: group,
lastT: atomic.Value{},
ticker: time.NewTicker(period),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
}

r.Init()

return r
}

func (n *periodicBarrier) Init() {
n.lastT.Store(time.Time{})
n.wg.Add(1)

go n.periodicEmitter()
}

func (n *periodicBarrier) Stop() {
close(n.stopC)
n.ticker.Stop()
n.wg.Wait()
}

func (n *periodicBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *periodicBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}
func (n *periodicBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *periodicBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}
func (n *periodicBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) {
if m.GroupID() == n.group.ID {
n.Stop()
}
return m, nil
}

func (n *periodicBarrier) Point(m edge.PointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
return m, nil
}
return nil, nil
}

func (n *periodicBarrier) emitBarrier() error {
nowT := time.Now().UTC()
n.lastT.Store(nowT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT))
}

func (n *periodicBarrier) periodicEmitter() {
defer n.wg.Done()
for {
select {
case <-n.ticker.C:
n.emitBarrier()
case <-n.stopC:
return
}
}
}
10 changes: 4 additions & 6 deletions edge/grouped.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,11 @@ func (c *groupedConsumer) Point(p PointMessage) error {
}

func (c *groupedConsumer) Barrier(b BarrierMessage) error {
// Barriers messages apply to all gorups
for _, r := range c.groups {
if err := r.Barrier(b); err != nil {
return err
}
r, err := c.getOrCreateGroup(b.GroupInfo(), b)
if err != nil {
return err
}
return nil
return r.Barrier(b)
}

func (c *groupedConsumer) DeleteGroup(d DeleteGroupMessage) error {
Expand Down
35 changes: 28 additions & 7 deletions edge/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,15 +876,21 @@ func (l BatchPointMessages) Swap(i int, j int) { l[i], l[j] = l[j], l[i] }
type BarrierMessage interface {
Message
ShallowCopy() BarrierMessage
TimeSetter
GroupInfoer
NameGetter
DimensionGetter
TagGetter
TimeGetter
}
type barrierMessage struct {
time time.Time
group GroupInfo
time time.Time
}

func NewBarrierMessage(time time.Time) BarrierMessage {
func NewBarrierMessage(group GroupInfo, time time.Time) BarrierMessage {
return &barrierMessage{
time: time,
group: group,
time: time,
}
}

Expand All @@ -894,15 +900,30 @@ func (b *barrierMessage) ShallowCopy() BarrierMessage {
return c
}

func (*barrierMessage) Name() string {
return "barrier"
}

func (*barrierMessage) Dimensions() models.Dimensions {
return models.Dimensions{}
}

func (*barrierMessage) Tags() models.Tags {
return models.Tags{}
}

func (*barrierMessage) Type() MessageType {
return Barrier
}
func (b *barrierMessage) GroupID() models.GroupID {
return b.group.ID
}
func (b *barrierMessage) GroupInfo() GroupInfo {
return b.group
}
func (b *barrierMessage) Time() time.Time {
return b.time
}
func (b *barrierMessage) SetTime(time time.Time) {
b.time = time
}

type DeleteGroupMessage interface {
Message
Expand Down
Loading

0 comments on commit a2a85bf

Please sign in to comment.