Skip to content

Commit

Permalink
Lazily JSON parse bloblang context
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Sep 10, 2020
1 parent a53c197 commit 6252367
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 67 deletions.
32 changes: 22 additions & 10 deletions internal/bloblang/mapping/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ func (e *Executor) Maps() map[string]query.Function {
func (e *Executor) QueryPart(index int, msg Message) (bool, error) {
var valuePtr *interface{}
var parseErr error
if jObj, err := msg.Get(index).JSON(); err == nil {
valuePtr = &jObj
} else {
parseErr = err

lazyValue := func() *interface{} {
if valuePtr == nil && parseErr == nil {
if jObj, err := msg.Get(index).JSON(); err == nil {
valuePtr = &jObj
} else {
parseErr = err
}
}
return valuePtr
}

var newValue interface{} = query.Nothing(nil)
Expand All @@ -100,7 +106,7 @@ func (e *Executor) QueryPart(index int, msg Message) (bool, error) {
for _, stmt := range e.statements {
res, err := stmt.query.Exec(query.FunctionContext{
Maps: e.maps,
Value: valuePtr,
Value: lazyValue,
Vars: vars,
Index: index,
MsgBatch: msg,
Expand Down Expand Up @@ -162,10 +168,16 @@ func (e *Executor) MapOnto(part types.Part, index int, msg Message) (types.Part,
func (e *Executor) mapPart(appendTo types.Part, index int, reference Message) (types.Part, error) {
var valuePtr *interface{}
var parseErr error
if jObj, err := reference.Get(index).JSON(); err == nil {
valuePtr = &jObj
} else {
parseErr = err

lazyValue := func() *interface{} {
if valuePtr == nil && parseErr == nil {
if jObj, err := reference.Get(index).JSON(); err == nil {
valuePtr = &jObj
} else {
parseErr = err
}
}
return valuePtr
}

var newPart types.Part
Expand All @@ -187,7 +199,7 @@ func (e *Executor) mapPart(appendTo types.Part, index int, reference Message) (t
for _, stmt := range e.statements {
res, err := stmt.query.Exec(query.FunctionContext{
Maps: e.maps,
Value: valuePtr,
Value: lazyValue,
Vars: vars,
Index: index,
MsgBatch: reference,
Expand Down
9 changes: 3 additions & 6 deletions internal/bloblang/mapping/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,23 +335,20 @@ func TestExec(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
res, err := test.mapping.Exec(query.FunctionContext{
Value: &test.input,
MsgBatch: message.New(nil),
})
}.WithValue(test.input))
if len(test.err) > 0 {
require.EqualError(t, err, test.err)
} else {
assert.Equal(t, test.output, res)
}
resString := test.mapping.ToString(query.FunctionContext{
Value: &test.input,
MsgBatch: message.New(nil),
})
}.WithValue(test.input))
assert.Equal(t, test.outputString, resString)
resBytes := test.mapping.ToBytes(query.FunctionContext{
Value: &test.input,
MsgBatch: message.New(nil),
})
}.WithValue(test.input))
assert.Equal(t, test.outputString, string(resBytes))
})
}
Expand Down
3 changes: 1 addition & 2 deletions internal/bloblang/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,9 @@ func TestMappings(t *testing.T) {
}))

res, err := m.Exec(query.FunctionContext{
Value: &test.input,
MsgBatch: message.New(nil),
Vars: map[string]interface{}{},
})
}.WithValue(test.input))
require.NoError(t, err)
assert.Equal(t, test.output, res)
})
Expand Down
5 changes: 3 additions & 2 deletions internal/bloblang/parser/query_expression_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func matchCaseParser() Type {
case query.Function:
if lit, isLiteral := t.(*query.Literal); isLiteral {
caseFn = query.ClosureFunction(func(ctx query.FunctionContext) (interface{}, error) {
if ctx.Value == nil {
v := ctx.Value()
if v == nil {
return false, nil
}
return *ctx.Value == lit.Value, nil
return *v == lit.Value, nil
}, nil)
} else {
caseFn = t
Expand Down
4 changes: 2 additions & 2 deletions internal/bloblang/parser/query_expression_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ func TestExpressionsParser(t *testing.T) {

res := query.ExecToString(e, query.FunctionContext{
Index: test.index, MsgBatch: msg,
Value: test.value,
Value: func() *interface{} { return test.value },
})
assert.Equal(t, test.output, res)
res = string(query.ExecToBytes(e, query.FunctionContext{
Index: test.index, MsgBatch: msg,
Value: test.value,
Value: func() *interface{} { return test.value },
}))
assert.Equal(t, test.output, res)
})
Expand Down
4 changes: 2 additions & 2 deletions internal/bloblang/parser/query_function_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,12 @@ bar""")`,

res := query.ExecToString(e, query.FunctionContext{
Index: test.index, MsgBatch: msg,
Value: test.value,
Value: func() *interface{} { return test.value },
})
assert.Equal(t, test.output, res)
res = string(query.ExecToBytes(e, query.FunctionContext{
Index: test.index, MsgBatch: msg,
Value: test.value,
Value: func() *interface{} { return test.value },
}))
assert.Equal(t, test.output, res)
})
Expand Down
2 changes: 1 addition & 1 deletion internal/bloblang/parser/query_literal_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestLiteralParser(t *testing.T) {

result, err := q.Exec(query.FunctionContext{
Index: 0, MsgBatch: message.New(nil),
Value: test.value,
Value: func() *interface{} { return test.value },
})
if len(test.err) > 0 {
assert.EqualError(t, err, test.err)
Expand Down
6 changes: 3 additions & 3 deletions internal/bloblang/query/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func NewMatchFunction(contextFn Function, cases ...MatchCase) Function {
if contextFn == nil {
contextFn = ClosureFunction(func(ctx FunctionContext) (interface{}, error) {
var value interface{}
if ctx.Value != nil {
value = *ctx.Value
if v := ctx.Value(); v != nil {
value = *v
}
return value, nil
}, nil)
Expand All @@ -36,7 +36,7 @@ func NewMatchFunction(contextFn Function, cases ...MatchCase) Function {
if err != nil {
return nil, err
}
ctx.Value = &ctxVal
ctx = ctx.WithValue(ctxVal)
for i, c := range cases {
var caseVal interface{}
if caseVal, err = c.caseFn.Exec(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/bloblang/query/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestExpressions(t *testing.T) {

for i := 0; i < 10; i++ {
res, err := test.input.Exec(FunctionContext{
Value: test.value,
Value: func() *interface{} { return test.value },
Maps: map[string]Function{},
Index: test.index,
MsgBatch: msg,
Expand Down
7 changes: 4 additions & 3 deletions internal/bloblang/query/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ type fieldFunction struct {
}

func (f *fieldFunction) Exec(ctx FunctionContext) (interface{}, error) {
if ctx.Value == nil {
v := ctx.Value()
if v == nil {
return nil, &ErrRecoverable{
Recovered: nil,
Err: ErrNoContext,
}
}
if len(f.path) == 0 {
return *ctx.Value, nil
return *v, nil
}
return gabs.Wrap(*ctx.Value).S(f.path...).Data(), nil
return gabs.Wrap(*v).S(f.path...).Data(), nil
}

func (f *fieldFunction) QueryTargets(ctx TargetsContext) []TargetPath {
Expand Down
5 changes: 2 additions & 3 deletions internal/bloblang/query/literals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ func TestLiterals(t *testing.T) {
res := test.input
if fn, ok := test.input.(Function); ok {
res, err = fn.Exec(FunctionContext{
Value: &test.value,
Maps: map[string]Function{},
})
Maps: map[string]Function{},
}.WithValue(test.value))
targets = fn.QueryTargets(TargetsContext{
Maps: map[string]Function{},
})
Expand Down
34 changes: 11 additions & 23 deletions internal/bloblang/query/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func allMethod(target Function, args ...interface{}) (Function, error) {
}

for i, v := range arr {
vCtx := ctx
vCtx.Value = &v
vCtx := ctx.WithValue(v)
res, err := queryFn.Exec(vCtx)
if err != nil {
return nil, fmt.Errorf("element %v: %w", i, err)
Expand Down Expand Up @@ -104,8 +103,7 @@ func anyMethod(target Function, args ...interface{}) (Function, error) {
}

for i, v := range arr {
vCtx := ctx
vCtx.Value = &v
vCtx := ctx.WithValue(v)
res, err := queryFn.Exec(vCtx)
if err != nil {
return nil, fmt.Errorf("element %v: %w", i, err)
Expand Down Expand Up @@ -183,7 +181,7 @@ func applyMethod(target Function, args ...interface{}) (Function, error) {
if err != nil {
return nil, err
}
ctx.Value = &res
ctx = ctx.WithValue(res)

if ctx.Maps == nil {
return nil, &ErrRecoverable{
Expand Down Expand Up @@ -551,8 +549,7 @@ func filterMethod(target Function, args ...interface{}) (Function, error) {
case []interface{}:
newSlice := make([]interface{}, 0, len(t))
for _, v := range t {
ctx.Value = &v
f, err := mapFn.Exec(ctx)
f, err := mapFn.Exec(ctx.WithValue(v))
if err != nil {
return nil, err
}
Expand All @@ -568,8 +565,7 @@ func filterMethod(target Function, args ...interface{}) (Function, error) {
"key": k,
"value": v,
}
ctx.Value = &ctxMap
f, err := mapFn.Exec(ctx)
f, err := mapFn.Exec(ctx.WithValue(ctxMap))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -687,10 +683,7 @@ func foldMethod(target Function, args ...interface{}) (Function, error) {
tmpObj["tally"] = tally
tmpObj["value"] = v

var tmpVal interface{} = tmpObj
ctx.Value = &tmpVal

newV, mapErr := foldFn.Exec(ctx)
newV, mapErr := foldFn.Exec(ctx.WithValue(tmpObj))
if mapErr != nil {
return nil, mapErr
}
Expand Down Expand Up @@ -1055,8 +1048,7 @@ func mapMethod(target Function, args ...interface{}) (Function, error) {
if err != nil {
return nil, err
}
ctx.Value = &res
return mapFn.Exec(ctx)
return mapFn.Exec(ctx.WithValue(res))
}, func(ctx TargetsContext) []TargetPath {
return expandTargetPaths(target.QueryTargets(ctx), mapFn.QueryTargets(ctx))
}), nil
Expand Down Expand Up @@ -1108,8 +1100,7 @@ func mapEachMethod(target Function, args ...interface{}) (Function, error) {
case []interface{}:
newSlice := make([]interface{}, 0, len(t))
for i, v := range t {
ctx.Value = &v
newV, mapErr := mapFn.Exec(ctx)
newV, mapErr := mapFn.Exec(ctx.WithValue(v))
if mapErr != nil {
if recover, ok := mapErr.(*ErrRecoverable); ok {
newV = recover.Recovered
Expand All @@ -1134,8 +1125,7 @@ func mapEachMethod(target Function, args ...interface{}) (Function, error) {
"key": k,
"value": v,
}
ctx.Value = &ctxMap
newV, mapErr := mapFn.Exec(ctx)
newV, mapErr := mapFn.Exec(ctx.WithValue(ctxMap))
if mapErr != nil {
if recover, ok := mapErr.(*ErrRecoverable); ok {
newV = recover.Recovered
Expand Down Expand Up @@ -1438,8 +1428,7 @@ func sortMethod(target Function, args ...interface{}) (Function, error) {
"left": values[i],
"right": values[j],
}
ctx.Value = &ctxValue
v, err := mapFn.Exec(ctx)
v, err := mapFn.Exec(ctx.WithValue(ctxValue))
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1728,9 +1717,8 @@ func uniqueMethod(target Function, args ...interface{}) (Function, error) {
for i, v := range slice {
check := v
if emitFn != nil {
ctx.Value = &v
var err error
if check, err = emitFn.Exec(ctx); err != nil {
if check, err = emitFn.Exec(ctx.WithValue(v)); err != nil {
return nil, fmt.Errorf("index %v: %w", i, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/bloblang/query/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,7 @@ func TestMethods(t *testing.T) {

for i := 0; i < 10; i++ {
res, err := test.input.Exec(FunctionContext{
Value: test.value,
Value: func() *interface{} { return test.value },
Maps: map[string]Function{},
Index: test.index,
MsgBatch: msg,
Expand Down
10 changes: 9 additions & 1 deletion internal/bloblang/query/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ type MessageBatch interface {
// FunctionContext provides access to a range of query targets for functions to
// reference.
type FunctionContext struct {
Value *interface{}
Value func() *interface{}
Maps map[string]Function
Vars map[string]interface{}
Index int
MsgBatch MessageBatch
Legacy bool
}

// WithValue returns a function context with a new value.
func (ctx FunctionContext) WithValue(v interface{}) FunctionContext {
ctx.Value = func() *interface{} {
return &v
}
return ctx
}

// TargetsContext provides access to a range of query targets for functions to
// reference when determining their targets.
type TargetsContext struct {
Expand Down
15 changes: 12 additions & 3 deletions lib/condition/bloblang.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,21 @@ func (c *Bloblang) Check(msg types.Message) bool {
c.mCount.Incr(1)

var valuePtr *interface{}
if jObj, err := msg.Get(0).JSON(); err == nil {
valuePtr = &jObj
var parseErr error

lazyValue := func() *interface{} {
if valuePtr == nil && parseErr == nil {
if jObj, err := msg.Get(0).JSON(); err == nil {
valuePtr = &jObj
} else {
parseErr = err
}
}
return valuePtr
}

result, err := c.fn.Exec(query.FunctionContext{
Value: valuePtr,
Value: lazyValue,
Maps: map[string]query.Function{},
Vars: map[string]interface{}{},
MsgBatch: msg,
Expand Down
3 changes: 1 addition & 2 deletions lib/metrics/path_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ func (m *pathMapping) mapPath(path string) string {
}
var input interface{} = path
v, err := m.m.Exec(query.FunctionContext{
Value: &input,
Maps: map[string]query.Function{},
Vars: map[string]interface{}{},
MsgBatch: message.New(nil),
})
}.WithValue(input))
if err != nil {
m.logger.Errorf("Failed to apply path mapping on '%v': %v\n", path, err)
return path
Expand Down
Loading

0 comments on commit 6252367

Please sign in to comment.