Skip to content

Commit

Permalink
Implement currentLevel for determineLevel function
Browse files Browse the repository at this point in the history
  • Loading branch information
Minh Danh committed Aug 5, 2016
1 parent a5f74c1 commit 28290f2
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ type AlertNode struct {

bufPool sync.Pool

levelResets []stateful.Expression
levelResets []stateful.Expression
lrScopePools []stateful.ScopePool
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
Expand Down Expand Up @@ -313,6 +314,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.scopePools = make([]stateful.ScopePool, CritAlert+1)

an.levelResets = make([]stateful.Expression, CritAlert+1)
an.lrScopePools = make([]stateful.ScopePool, CritAlert+1)

if n.Info != nil {
statefulExpression, expressionCompileError := stateful.NewExpression(n.Info.Expression)
Expand All @@ -328,6 +330,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for infoReset: %s", lexpressionCompileError)
}
an.levelResets[InfoAlert] = lstatefulExpression
an.lrScopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.InfoReset.Expression))
}
}

Expand All @@ -344,6 +347,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for warnReset: %s", lexpressionCompileError)
}
an.levelResets[WarnAlert] = lstatefulExpression
an.lrScopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.WarnReset.Expression))
}
}

Expand All @@ -360,6 +364,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for critReset: %s", lexpressionCompileError)
}
an.levelResets[CritAlert] = lstatefulExpression
an.lrScopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.CritReset.Expression))
}
}

Expand Down Expand Up @@ -399,7 +404,11 @@ func (a *AlertNode) runAlert([]byte) error {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
a.timer.Start()
l := a.determineLevel(p.Time, p.Fields, p.Tags)
var currentLevel AlertLevel
if _, ok := a.states[p.Group]; ok {
currentLevel = a.states[p.Group].currentLevel()
}
l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel)
state := a.updateState(p.Time, l, p.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed && !state.expired) {
a.timer.Stop()
Expand Down Expand Up @@ -467,7 +476,11 @@ func (a *AlertNode) runAlert([]byte) error {
var highestPoint *models.BatchPoint

for i, p := range b.Points {
l := a.determineLevel(p.Time, p.Fields, p.Tags)
var currentLevel AlertLevel
if _, ok := a.states[b.Group]; ok {
currentLevel = a.states[b.Group].currentLevel()
}
l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel)
if l < lowestLevel {
lowestLevel = l
}
Expand Down Expand Up @@ -577,7 +590,29 @@ func (a *AlertNode) handleAlert(ad *AlertData) {
}
}

func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string) AlertLevel {
func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string, currentLevel AlertLevel) AlertLevel {
// Evaluate reset expression of current level if it exists
if cse := a.levelResets[currentLevel]; cse != nil {
if pass, err := EvalPredicate(cse, a.lrScopePools[currentLevel], now, fields, tags); err != nil {
a.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err)
} else if !pass {
for l := len(a.levels) - 1; l >= 0; l-- {
se := a.levels[l]
if se == nil {
continue
}
if pass, err := EvalPredicate(se, a.scopePools[l], now, fields, tags); err != nil {
a.logger.Printf("E! error evaluating expression for level %v: %s", AlertLevel(l), err)
continue
} else if pass {
if currentLevel < AlertLevel(l) {
return AlertLevel(l)
}
}
}
return currentLevel
}
}
for l := len(a.levels) - 1; l >= 0; l-- {
se := a.levels[l]
if se == nil {
Expand All @@ -588,18 +623,6 @@ func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map
continue
} else if pass {
return AlertLevel(l)
} else {
lrse := a.levelResets[l]
if lrse != nil {
if pass, err := EvalPredicate(lrse, a.scopePools[l], now, fields, tags); err != nil {
a.logger.Printf("E! error evaluating expression for level %v reset : %s", AlertLevel(l), err)
continue
} else if pass {
continue
} else {
return AlertLevel(l)
}
}
}
}
return OKAlert
Expand Down Expand Up @@ -683,6 +706,11 @@ func (a *alertState) addEvent(level AlertLevel) {
a.history[a.idx] = level
}

// Return current level of this state
func (a *alertState) currentLevel() AlertLevel {
return a.history[a.idx]
}

// Compute the percentage change in the alert history.
func (a *alertState) percentChange() float64 {
l := len(a.history)
Expand Down

0 comments on commit 28290f2

Please sign in to comment.