Skip to content

Commit

Permalink
PR influxdata#443: Add support to a Validation phase to the task life…
Browse files Browse the repository at this point in the history
…cycle.

It is better if we can detect and report configuration errors of task
pipeline early, during task definition, rather than deferring such
checks to runtime (when this is possible).

To support such checks, we add a Validate() error method to
the pipeline.Node() interface which gives pipeline nodes a chance
to validate their configuration. Any pipeline containing nodes
that do not pass these test cannot be saved.

Signed-off-by: Jon Seymour <[email protected]>
  • Loading branch information
jonseymour committed Apr 13, 2016
1 parent c3cad29 commit 9b88fa3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Node interface {
// The type of output the node provides.
Provides() EdgeType

// Check that the definition of the node is consistent
validate() error

// Helper methods for walking DAG
tMark() bool
setTMark(b bool)
Expand Down Expand Up @@ -167,6 +170,10 @@ func (n *node) Provides() EdgeType {
return n.provides
}

func (n *node) validate() error {
return nil
}

func (n *node) dot(buf *bytes.Buffer) {
for _, c := range n.children {
buf.Write([]byte(fmt.Sprintf("%s -> %s;\n", n.Name(), c.Name())))
Expand Down
6 changes: 6 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadm
return nil, fmt.Errorf("source edge type must be either Stream or Batch not %s", sourceEdge)
}
}
if err = p.Walk(
func(n Node) error {
return n.validate()
}); err != nil {
return nil, err
}
return p, nil
}

Expand Down

0 comments on commit 9b88fa3

Please sign in to comment.