Skip to content

Commit

Permalink
Add JSON marshaling and unmarshaling to all pipeline nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
goller committed Oct 18, 2017
1 parent 36909d9 commit 8d6a481
Show file tree
Hide file tree
Showing 31 changed files with 1,014 additions and 567 deletions.
6 changes: 3 additions & 3 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ func newAlertNode(wants EdgeType) *AlertNode {
func (n *AlertNode) MarshalJSON() ([]byte, error) {
type Alias AlertNodeData
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "alert",
ID: n.ID(),
},
Expand All @@ -398,7 +398,7 @@ func (n *AlertNode) MarshalJSON() ([]byte, error) {
func (n *AlertNode) UnmarshalJSON(data []byte) error {
type Alias AlertNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
Alias: (*Alias)(n),
Expand Down
93 changes: 78 additions & 15 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"reflect"
"time"

"github.com/influxdata/influxdb/influxql"
)

// A node that handles creating several child QueryNodes.
Expand Down Expand Up @@ -46,10 +48,10 @@ func newBatchNode() *BatchNode {
func (n *BatchNode) MarshalJSON() ([]byte, error) {
type Alias BatchNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "batch",
ID: n.ID(),
},
Expand All @@ -62,7 +64,7 @@ func (n *BatchNode) MarshalJSON() ([]byte, error) {
func (n *BatchNode) UnmarshalJSON(data []byte) error {
type Alias BatchNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
Alias: (*Alias)(n),
Expand Down Expand Up @@ -115,32 +117,32 @@ func (b *BatchNode) dot(buf *bytes.Buffer) {
// In the above example InfluxDB is queried every 20 seconds; the window of time returned
// spans 1 minute and is grouped into 10 second buckets.
type QueryNode struct {
chainnode
chainnode `json:"-"`

// The query text
//tick:ignore
QueryStr string
QueryStr string `json:"queryStr"`

// The period or length of time that will be queried from InfluxDB
Period time.Duration
Period time.Duration `json:"period"`

// How often to query InfluxDB.
//
// The Every property is mutually exclusive with the Cron property.
Every time.Duration
Every time.Duration `json:"every"`

// Align start and end times with the Every value
// Does not apply if Cron is used.
// tick:ignore
AlignFlag bool `tick:"Align"`
AlignFlag bool `tick:"Align" json:"align"`

// Define a schedule using a cron syntax.
//
// The specific cron implementation is documented here:
// https://github.com/gorhill/cronexpr#implementation
//
// The Cron property is mutually exclusive with the Every property.
Cron string
Cron string `json:"cron"`

// How far back in time to query from the current time
//
Expand All @@ -149,19 +151,19 @@ type QueryNode struct {
//
// This applies to Cron schedules as well. If the cron specifies to run every Sunday at
// 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried.
Offset time.Duration
Offset time.Duration `json:"offset"`

// Align the group by time intervals with the start time of the query
// tick:ignore
AlignGroupFlag bool `tick:"AlignGroup"`
AlignGroupFlag bool `tick:"AlignGroup" json:"alignGroup"`

// The list of dimensions for the group-by clause.
//tick:ignore
Dimensions []interface{} `tick:"GroupBy"`
Dimensions []interface{} `tick:"GroupBy" json:"groupBy"`

// Whether to include the measurement in the group ID.
// tick:ignore
GroupByMeasurementFlag bool `tick:"GroupByMeasurement"`
GroupByMeasurementFlag bool `tick:"GroupByMeasurement" json:"groupByMeasurement"`

// Fill the data.
// Options are:
Expand All @@ -171,11 +173,11 @@ type QueryNode struct {
// - previous - reports the value of the previous window
// - none - suppresses timestamps and values where the value is null
// - linear - reports the results of linear interpolation
Fill interface{}
Fill interface{} `json:"fill"`

// The name of a configured InfluxDB cluster.
// If empty the default cluster will be used.
Cluster string
Cluster string `json:"cluster"`
}

func newQueryNode() *QueryNode {
Expand All @@ -185,6 +187,67 @@ func newQueryNode() *QueryNode {
return b
}

// MarshalJSON converts QueryNode to JSON
func (n *QueryNode) MarshalJSON() ([]byte, error) {
type Alias QueryNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
Offset string `json:"offset"`
}{
TypeOf: TypeOf{
Type: "query",
ID: n.ID(),
},
Alias: (*Alias)(n),
Period: influxql.FormatDuration(n.Period),
Every: influxql.FormatDuration(n.Every),
Offset: influxql.FormatDuration(n.Offset),
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an QueryNode
func (n *QueryNode) UnmarshalJSON(data []byte) error {
type Alias QueryNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
Offset string `json:"offset"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "query" {
return fmt.Errorf("error unmarshaling node %d of type %s as QueryNode", raw.ID, raw.Type)
}

n.Period, err = influxql.ParseDuration(raw.Period)
if err != nil {
return err
}

n.Every, err = influxql.ParseDuration(raw.Every)
if err != nil {
return err
}

n.Offset, err = influxql.ParseDuration(raw.Offset)
if err != nil {
return err
}

n.setID(raw.ID)
return nil
}

//tick:ignore
func (n *QueryNode) ChainMethods() map[string]reflect.Value {
return map[string]reflect.Value{
Expand Down
6 changes: 3 additions & 3 deletions pipeline/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func newCombineNode(e EdgeType, lambdas []*ast.LambdaNode) *CombineNode {
func (n *CombineNode) MarshalJSON() ([]byte, error) {
type Alias CombineNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "combine",
ID: n.ID(),
},
Expand All @@ -115,7 +115,7 @@ func (n *CombineNode) MarshalJSON() ([]byte, error) {
func (n *CombineNode) UnmarshalJSON(data []byte) error {
type Alias CombineNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
Expand Down
6 changes: 3 additions & 3 deletions pipeline/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func newDefaultNode(e EdgeType) *DefaultNode {
func (n *DefaultNode) MarshalJSON() ([]byte, error) {
type Alias DefaultNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "default",
ID: n.ID(),
},
Expand All @@ -62,7 +62,7 @@ func (n *DefaultNode) MarshalJSON() ([]byte, error) {
func (n *DefaultNode) UnmarshalJSON(data []byte) error {
type Alias DefaultNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
Alias: (*Alias)(n),
Expand Down
6 changes: 3 additions & 3 deletions pipeline/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func newDeleteNode(e EdgeType) *DeleteNode {
func (n *DeleteNode) MarshalJSON() ([]byte, error) {
type Alias DeleteNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "delete",
ID: n.ID(),
},
Expand All @@ -59,7 +59,7 @@ func (n *DeleteNode) MarshalJSON() ([]byte, error) {
func (n *DeleteNode) UnmarshalJSON(data []byte) error {
type Alias DeleteNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
Alias: (*Alias)(n),
Expand Down
62 changes: 47 additions & 15 deletions pipeline/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"time"

"github.com/influxdata/influxdb/influxql"
)

// Compute the derivative of a stream or batch.
Expand All @@ -29,24 +31,24 @@ import (
// because of boundary conditions the first point is
// dropped.
type DerivativeNode struct {
chainnode
chainnode `json:"-"`

// The field to use when calculating the derivative
// tick:ignore
Field string
Field string `json:"field"`

// The new name of the derivative field.
// Default is the name of the field used
// when calculating the derivative.
As string
As string `json:"as"`

// The time unit of the resulting derivative value.
// Default: 1s
Unit time.Duration
Unit time.Duration `json:"unit"`

// Where negative values are acceptable.
// tick:ignore
NonNegativeFlag bool `tick:"NonNegative"`
NonNegativeFlag bool `tick:"NonNegative" json:"nonNegative"`
}

func newDerivativeNode(wants EdgeType, field string) *DerivativeNode {
Expand All @@ -58,17 +60,47 @@ func newDerivativeNode(wants EdgeType, field string) *DerivativeNode {
}
}

func (d *DerivativeNode) MarshalJSON() ([]byte, error) {
props := map[string]interface{}{
"type": "derivative",
"nodeID": fmt.Sprintf("%d", d.ID()),
"children": d.node,
"field": d.Field,
"as": d.As,
"unit": d.Unit,
"nonNegative": d.NonNegativeFlag,
// MarshalJSON converts DerivativeNode to JSON
func (n *DerivativeNode) MarshalJSON() ([]byte, error) {
type Alias DerivativeNode
var raw = &struct {
TypeOf
*Alias
Unit string `json:"unit"`
}{
TypeOf: TypeOf{
Type: "derivative",
ID: n.ID(),
},
Alias: (*Alias)(n),
Unit: influxql.FormatDuration(n.Unit),
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an DerivativeNode
func (n *DerivativeNode) UnmarshalJSON(data []byte) error {
type Alias DerivativeNode
var raw = &struct {
TypeOf
*Alias
Unit string `json:"unit"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "derivative" {
return fmt.Errorf("error unmarshaling node %d of type %s as DerivativeNode", raw.ID, raw.Type)
}
n.Unit, err = influxql.ParseDuration(raw.Unit)
if err != nil {
return err
}
return json.Marshal(props)
n.setID(raw.ID)
return nil
}

// If called the derivative will skip negative results.
Expand Down
6 changes: 3 additions & 3 deletions pipeline/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func newEvalNode(e EdgeType, exprs []*ast.LambdaNode) *EvalNode {
func (n *EvalNode) MarshalJSON() ([]byte, error) {
type Alias EvalNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
TypeOf: &TypeOf{
TypeOf: TypeOf{
Type: "eval",
ID: n.ID(),
},
Expand All @@ -79,7 +79,7 @@ func (n *EvalNode) MarshalJSON() ([]byte, error) {
func (n *EvalNode) UnmarshalJSON(data []byte) error {
type Alias EvalNode
var raw = &struct {
*TypeOf
TypeOf
*Alias
}{
Alias: (*Alias)(n),
Expand Down
Loading

0 comments on commit 8d6a481

Please sign in to comment.