Skip to content

Commit

Permalink
Scope reusing & smaller scopes (influxdata#496)
Browse files Browse the repository at this point in the history
For each expression we are creating "scope pool", which is object pool of scopes - with some extra magic.
By doing quick analysis on the node AST I know which tags and fields he requires. so we put only the required ones.
For example: "value" > 10, I fill only "value" from field or tag.

name                     old time/op    new time/op    delta
_T10_P500_AlertTask-4       139ms ± 2%     132ms ± 3%   -5.23%  (p=0.008
n=5+5)
_T10_P50000_AlertTask-4     14.6s ± 1%     13.3s ± 1%   -8.92%  (p=0.008
n=5+5)
_T1000_P500_AlertTask-4     13.8s ± 1%     13.1s ± 2%   -4.87%  (p=0.008
n=5+5)

name                     old alloc/op   new alloc/op   delta
_T10_P500_AlertTask-4      32.1MB ± 0%    25.9MB ± 0%  -19.52%  (p=0.008
n=5+5)
_T10_P50000_AlertTask-4    3.26GB ± 0%    2.62GB ± 0%  -19.78%  (p=0.008
n=5+5)
_T1000_P500_AlertTask-4    3.21GB ± 0%    2.61GB ± 0%  -18.71%  (p=0.008
n=5+5)

name                     old allocs/op  new allocs/op  delta
_T10_P500_AlertTask-4        406k ± 0%      333k ± 0%  -17.96%  (p=0.008
n=5+5)
_T10_P50000_AlertTask-4     41.4M ± 0%     34.0M ± 0%  -18.05%  (p=0.008
n=5+5)
_T1000_P500_AlertTask-4     40.2M ± 0%     33.1M ± 0%  -17.65%  (p=0.008
n=5+5)
  • Loading branch information
yosiat authored and Nathaniel Cook committed May 7, 2016
1 parent d3ae18d commit b90d70f
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 20 deletions.
8 changes: 7 additions & 1 deletion alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type AlertNode struct {
endpoint string
handlers []AlertHandler
levels []stateful.Expression
scopePools []stateful.ScopePool
states map[models.GroupID]*alertState
idTmpl *text.Template
messageTmpl *text.Template
Expand Down Expand Up @@ -274,13 +275,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *

// Parse level expressions
an.levels = make([]stateful.Expression, CritAlert+1)
an.scopePools = make([]stateful.ScopePool, CritAlert+1)

if n.Info != nil {
statefulExpression, expressionCompileError := stateful.NewExpression(n.Info)
if expressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for info: %s", expressionCompileError)
}

an.levels[InfoAlert] = statefulExpression
an.scopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Info))
}

if n.Warn != nil {
Expand All @@ -289,6 +293,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for warn: %s", expressionCompileError)
}
an.levels[WarnAlert] = statefulExpression
an.scopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Warn))
}

if n.Crit != nil {
Expand All @@ -297,6 +302,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for crit: %s", expressionCompileError)
}
an.levels[CritAlert] = statefulExpression
an.scopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Crit))
}

// Setup states
Expand Down Expand Up @@ -489,7 +495,7 @@ func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map
if se == nil {
continue
}
if pass, err := EvalPredicate(se, now, fields, tags); pass {
if pass, err := EvalPredicate(se, a.scopePools[l], now, fields, tags); pass {
level = AlertLevel(l)
} else if err != nil {
a.logger.Println("E! error evaluating expression:", err)
Expand Down
7 changes: 6 additions & 1 deletion eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type EvalNode struct {
node
e *pipeline.EvalNode
expressions []stateful.Expression
scopePool stateful.ScopePool
evalErrors *expvar.Int
}

Expand All @@ -43,6 +44,7 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
en.expressions[i] = statefulExpr
}

en.scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(n.Expressions...))
en.node.runF = en.runEval
return en, nil
}
Expand Down Expand Up @@ -82,7 +84,10 @@ func (e *EvalNode) runEval(snapshot []byte) error {
}

func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields {
vars, err := mergeFieldsAndTags(now, fields, tags)
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
err := fillScope(vars, e.scopePool.ReferenceVariables(), now, fields, tags)

if err != nil {
e.logger.Println("E!", err)
return nil
Expand Down
42 changes: 29 additions & 13 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,45 @@ import (
)

// EvalPredicate - Evaluate a given expression as a boolean predicate against a set of fields and tags
func EvalPredicate(se stateful.Expression, now time.Time, fields models.Fields, tags models.Tags) (bool, error) {
vars, err := mergeFieldsAndTags(now, fields, tags)
func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, now time.Time, fields models.Fields, tags models.Tags) (bool, error) {
vars := scopePool.Get()
defer scopePool.Put(vars)
err := fillScope(vars, scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
return false, err
}

b, err := se.EvalBool(vars)
if err != nil {
return false, err
}
return b, nil
}

func mergeFieldsAndTags(now time.Time, fields models.Fields, tags models.Tags) (*tick.Scope, error) {
scope := tick.NewScope()
scope.Set("time", now.Local())
for k, v := range fields {
if _, ok := tags[k]; ok {
return nil, fmt.Errorf("cannot have field and tags with same name %q", k)
// fillScope - given a scope and reference variables, we fill the exact variables from the now, fields and tags.
func fillScope(vars *tick.Scope, referenceVariables []string, now time.Time, fields models.Fields, tags models.Tags) error {
for _, refVariableName := range referenceVariables {
if refVariableName == "time" {
vars.Set("time", now.Local())
continue
}

// Support the error with tags/fields collison
var fieldValue interface{}
var isFieldExists bool

if fieldValue, isFieldExists = fields[refVariableName]; isFieldExists {
vars.Set(refVariableName, fieldValue)
}

if tagValue, ok := tags[refVariableName]; ok {
if isFieldExists {
return fmt.Errorf("cannot have field and tags with same name %q", refVariableName)
}

vars.Set(refVariableName, tagValue)
}
scope.Set(k, v)
}
for k, v := range tags {
scope.Set(k, v)
}
return scope, nil

return nil
}
4 changes: 3 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type FromNode struct {
node
s *pipeline.FromNode
expression stateful.Expression
scopePool stateful.ScopePool
dimensions []string
allDimensions bool
db string
Expand All @@ -66,6 +67,7 @@ func newFromNode(et *ExecutingTask, n *pipeline.FromNode, l *log.Logger) (*FromN
}

sn.expression = expr
sn.scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(n.Expression))
}

return sn, nil
Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *FromNode) matches(p models.Point) bool {
return false
}
if s.expression != nil {
if pass, err := EvalPredicate(s.expression, p.Time, p.Fields, p.Tags); err != nil {
if pass, err := EvalPredicate(s.expression, s.scopePool, p.Time, p.Fields, p.Tags); err != nil {
s.logger.Println("E! error while evaluating WHERE expression:", err)
return false
} else {
Expand Down
35 changes: 35 additions & 0 deletions tick/stateful/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,38 @@ func (se *expression) EvalNum(scope *tick.Scope) (interface{}, error) {
return nil, fmt.Errorf("expression returned unexpected type %s", returnType)
}
}

func FindReferenceVariables(nodes ...tick.Node) []string {

variablesSet := make(map[string]bool, 0)

for _, node := range nodes {
buildReferenceVariablesSet(node, variablesSet)
}

variables := make([]string, 0, len(variablesSet))

for variable := range variablesSet {
variables = append(variables, variable)
}

return variables
}

// util method for findReferenceVariables, we are passing the itemsSet and not returning it
// so we will won't to merge the maps
func buildReferenceVariablesSet(n tick.Node, itemsSet map[string]bool) {
switch node := n.(type) {
case *tick.ReferenceNode:
itemsSet[node.Reference] = true
case *tick.UnaryNode:
buildReferenceVariablesSet(node.Node, itemsSet)
case *tick.BinaryNode:
buildReferenceVariablesSet(node.Left, itemsSet)
buildReferenceVariablesSet(node.Right, itemsSet)
case *tick.FunctionNode:
for _, arg := range node.Args {
buildReferenceVariablesSet(arg, itemsSet)
}
}
}
57 changes: 57 additions & 0 deletions tick/stateful/scope_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package stateful

import (
"sync"

"github.com/influxdata/kapacitor/tick"
)

// ScopePool - pooling mechanism for tick.Scope
// The idea behind scope pool is to pool scopes and to put them only
// the needed variables for execution.
type ScopePool interface {
Get() *tick.Scope
Put(scope *tick.Scope)

ReferenceVariables() []string
}

type scopePool struct {
referenceVariables []string
pool sync.Pool
}

// NewScopePool - creates new ScopePool for the given Node
func NewScopePool(referenceVariables []string) ScopePool {
scopePool := &scopePool{
referenceVariables: referenceVariables,
}

scopePool.pool = sync.Pool{
New: func() interface{} {
scope := tick.NewScope()
for _, refVariable := range scopePool.referenceVariables {
scope.Set(refVariable, nil)
}

return scope
},
}

return scopePool
}

func (s *scopePool) ReferenceVariables() []string {
return s.referenceVariables
}

// Get - returns a scope from a pool with the needed reference variables
// (with nil values/old values) in the scope
func (s *scopePool) Get() *tick.Scope {
return s.pool.Get().(*tick.Scope)
}

// Put - put used scope back to the pool
func (s *scopePool) Put(scope *tick.Scope) {
s.pool.Put(scope)
}
56 changes: 56 additions & 0 deletions tick/stateful/scope_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package stateful_test

import (
"reflect"
"testing"

"github.com/influxdata/kapacitor/tick"
"github.com/influxdata/kapacitor/tick/stateful"
)

func TestScopePool_Sanity(t *testing.T) {
n := stateful.NewScopePool([]string{"value"})

// first
scope := n.Get()

_, existsErr := scope.Get("value")

if existsErr != nil {
t.Errorf("First: Expected \"value\" to exist in the scope, but go an error: %v", existsErr)
}

// second, after put
n.Put(scope)

scope = n.Get()
_, existsErr = scope.Get("value")

if existsErr != nil {
t.Errorf("Second: Expected \"value\" to exist in the scope, but go an error: %v", existsErr)
}
}

func TestExpression_RefernceVariables(t *testing.T) {

type expectation struct {
node tick.Node
refVariables []string
}

expectations := []expectation{
{node: &tick.NumberNode{IsFloat: true}, refVariables: make([]string, 0)},
{node: &tick.BoolNode{}, refVariables: make([]string, 0)},

{node: &tick.ReferenceNode{Reference: "yosi"}, refVariables: []string{"yosi"}},
{node: &tick.BinaryNode{Left: &tick.ReferenceNode{Reference: "value"}, Right: &tick.NumberNode{IsInt: true}}, refVariables: []string{"value"}},
}

for i, expect := range expectations {
refVariables := stateful.FindReferenceVariables(expect.node)
if !reflect.DeepEqual(refVariables, expect.refVariables) {
t.Errorf("[Iteration: %v, Node: %T] Got unexpected result:\ngot: %v\nexpected: %v", i+1, expect.node, refVariables, expect.refVariables)
}

}
}
21 changes: 17 additions & 4 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

type WhereNode struct {
node
w *pipeline.WhereNode
endpoint string
w *pipeline.WhereNode
endpoint string

expressions map[models.GroupID]stateful.Expression
scopePools map[models.GroupID]stateful.ScopePool
}

// Create a new WhereNode which filters down the batch or stream by a condition
Expand All @@ -23,6 +25,7 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn *
node: node{Node: n, et: et, logger: l},
w: n,
expressions: make(map[models.GroupID]stateful.Expression),
scopePools: make(map[models.GroupID]stateful.ScopePool),
}
wn.runF = wn.runWhere
if n.Expression == nil {
Expand All @@ -37,6 +40,8 @@ func (w *WhereNode) runWhere(snapshot []byte) error {
for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {
w.timer.Start()
expr := w.expressions[p.Group]
scopePool := w.scopePools[p.Group]

if expr == nil {
compiledExpr, err := stateful.NewExpression(w.w.Expression)
if err != nil {
Expand All @@ -45,8 +50,11 @@ func (w *WhereNode) runWhere(snapshot []byte) error {

expr = compiledExpr
w.expressions[p.Group] = expr

scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Expression))
w.scopePools[p.Group] = scopePool
}
if pass, err := EvalPredicate(expr, p.Time, p.Fields, p.Tags); pass {
if pass, err := EvalPredicate(expr, scopePool, p.Time, p.Fields, p.Tags); pass {
w.timer.Pause()
for _, child := range w.outs {
err := child.CollectPoint(p)
Expand All @@ -64,6 +72,8 @@ func (w *WhereNode) runWhere(snapshot []byte) error {
for b, ok := w.ins[0].NextBatch(); ok; b, ok = w.ins[0].NextBatch() {
w.timer.Start()
expr := w.expressions[b.Group]
scopePool := w.scopePools[b.Group]

if expr == nil {
compiledExpr, err := stateful.NewExpression(w.w.Expression)
if err != nil {
Expand All @@ -72,10 +82,13 @@ func (w *WhereNode) runWhere(snapshot []byte) error {

expr = compiledExpr
w.expressions[b.Group] = expr

scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Expression))
w.scopePools[b.Group] = scopePool
}
for i := 0; i < len(b.Points); {
p := b.Points[i]
if pass, err := EvalPredicate(expr, p.Time, p.Fields, p.Tags); !pass {
if pass, err := EvalPredicate(expr, scopePool, p.Time, p.Fields, p.Tags); !pass {
if err != nil {
w.logger.Println("E! error while evaluating WHERE expression:", err)
}
Expand Down

0 comments on commit b90d70f

Please sign in to comment.