Skip to content

Commit

Permalink
add JSON marshalling support to barrier node
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Dec 1, 2017
1 parent a18dc67 commit b32962a
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
63 changes: 61 additions & 2 deletions pipeline/barrier.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package pipeline

import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/influxdata/influxdb/influxql"
)

// A BarrierNode will emit a barrier with the current time, according to the system
Expand All @@ -27,12 +31,12 @@ type BarrierNode struct {

// Emit barrier based on idle time since the last received message.
// Must be greater than zero.
Idle time.Duration
Idle time.Duration `json:"idle"`

// Emit barrier based on periodic timer. The timer is based on system
// clock rather than message time.
// Must be greater than zero.
Period time.Duration
Period time.Duration `json:"period"`
}

func newBarrierNode(wants EdgeType) *BarrierNode {
Expand All @@ -55,3 +59,58 @@ func (b *BarrierNode) validate() error {

return nil
}

// MarshalJSON converts BarrierNode to JSON
// tick:ignore
func (n *BarrierNode) MarshalJSON() ([]byte, error) {
type Alias BarrierNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Idle string `json:"idle"`
}{
TypeOf: TypeOf{
Type: "barrier",
ID: n.ID(),
},
Alias: (*Alias)(n),
Period: influxql.FormatDuration(n.Period),
Idle: influxql.FormatDuration(n.Idle),
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an BarrierNode
// tick:ignore
func (n *BarrierNode) UnmarshalJSON(data []byte) error {
type Alias BarrierNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Idle string `json:"idle"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "barrier" {
return fmt.Errorf("error unmarshaling node %d of type %s as BarrierNode", raw.ID, raw.Type)
}

n.Period, err = influxql.ParseDuration(raw.Period)
if err != nil {
return err
}

n.Idle, err = influxql.ParseDuration(raw.Idle)
if err != nil {
return err
}

n.setID(raw.ID)
return nil
}
43 changes: 43 additions & 0 deletions pipeline/barrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pipeline

import (
"testing"
"time"
)

func TestBarrierNode_MarshalJSON(t *testing.T) {
type fields struct {
Period time.Duration
Idle time.Duration
}
tests := []struct {
name string
fields fields
want string
wantErr bool
}{
{
name: "all fields set",
fields: fields{
Period: time.Hour,
Idle: time.Minute,
},
want: `{"typeOf":"barrier","id":"0","period":"1h","idle":"1m"}`,
},
{
name: "only period ",
fields: fields{
Period: time.Hour,
},
want: `{"typeOf":"barrier","id":"0","period":"1h","idle":"0s"}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := newBarrierNode(StreamEdge)
b.Period = tt.fields.Period
b.Idle = tt.fields.Idle
MarshalTestHelper(t, b, tt.wantErr, tt.want)
})
}
}

0 comments on commit b32962a

Please sign in to comment.