diff --git a/batch.go b/batch.go index 34afd483a..6bdb78ec1 100644 --- a/batch.go +++ b/batch.go @@ -72,9 +72,9 @@ func (s *SourceBatchNode) Count() int { return len(s.children) } -func (s *SourceBatchNode) Start(collectors []*Edge) { - for i, b := range s.children { - b.(*BatchNode).Start(collectors[i]) +func (s *SourceBatchNode) Start() { + for _, b := range s.children { + b.(*BatchNode).Start() } } @@ -88,19 +88,18 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string { type BatchNode struct { node - b *pipeline.BatchNode - query *Query - ticker ticker - wg sync.WaitGroup - errC chan error - closing chan struct{} + b *pipeline.BatchNode + query *Query + ticker ticker + queryMu sync.Mutex + queryErr chan error + closing chan struct{} } func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) { bn := &BatchNode{ node: node{Node: n, et: et}, b: n, - errC: make(chan error, 1), closing: make(chan struct{}), } bn.node.runF = bn.runBatch @@ -176,10 +175,12 @@ func (b *BatchNode) DBRPs() ([]DBRP, error) { return b.query.DBRPs() } -func (b *BatchNode) Start(batch BatchCollector) { - b.wg.Add(1) +func (b *BatchNode) Start() { + b.queryMu.Lock() + defer b.queryMu.Unlock() + b.queryErr = make(chan error, 1) go func() { - b.errC <- b.doQuery(batch) + b.queryErr <- b.doQuery() }() } @@ -209,9 +210,8 @@ func (b *BatchNode) Queries(start, stop time.Time) []string { } // Query InfluxDB and collect batches on batch collector. -func (b *BatchNode) doQuery(batch BatchCollector) error { - defer batch.Close() - defer b.wg.Done() +func (b *BatchNode) doQuery() error { + defer b.ins[0].Close() if b.et.tm.InfluxDBService == nil { return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query") @@ -257,24 +257,22 @@ func (b *BatchNode) doQuery(batch BatchCollector) error { return err } for _, bch := range batches { - batch.CollectBatch(bch) + b.ins[0].CollectBatch(bch) } } } } } -func (b *BatchNode) stopBatch() { - if b.ticker != nil { - b.ticker.Stop() - } - close(b.closing) - b.wg.Wait() -} - func (b *BatchNode) runBatch() error { errC := make(chan error, 1) go func() { + defer func() { + err := recover() + if err != nil { + errC <- fmt.Errorf("%v", err) + } + }() for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() { for _, child := range b.outs { err := child.CollectBatch(bt) @@ -286,13 +284,27 @@ func (b *BatchNode) runBatch() error { } errC <- nil }() - // Wait for errors - select { - case err := <-b.errC: - return err - case err := <-errC: - return err + var queryErr error + b.queryMu.Lock() + if b.queryErr != nil { + b.queryMu.Unlock() + queryErr = <-b.queryErr + } else { + b.queryMu.Unlock() + } + + err := <-errC + if queryErr != nil { + return queryErr } + return err +} + +func (b *BatchNode) stopBatch() { + if b.ticker != nil { + b.ticker.Stop() + } + close(b.closing) } type ticker interface { diff --git a/clock/clock_test.go b/clock/clock_test.go index bd3128f7a..b90280b55 100644 --- a/clock/clock_test.go +++ b/clock/clock_test.go @@ -38,7 +38,7 @@ func TestClockUntilSleepFirst(t *testing.T) { c.Set(zero.Add(10 * time.Microsecond)) select { case <-done: - case <-time.After(10 * time.Millisecond): + case <-time.After(20 * time.Millisecond): t.Fatal("expected return from c.Until") } } @@ -139,7 +139,7 @@ func TestClockUntilMultipleGos(t *testing.T) { c.Set(now) select { case <-done: - case <-time.After(10 * time.Millisecond): + case <-time.After(20 * time.Millisecond): t.Fatalf("expected return from c.Until i: %d", i) } } diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 746e65613..23cab03fe 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -635,6 +635,7 @@ func doShow(args []string) error { fmt.Println("Error:", ti.Error) fmt.Println("Type:", ti.Type) fmt.Println("Enabled:", ti.Enabled) + fmt.Println("Executing:", ti.Executing) fmt.Println("Databases Retention Policies:", ti.DBRPs) fmt.Printf("TICKscript:\n%s\n\n", ti.TICKscript) fmt.Printf("DOT:\n%s\n", ti.Dot) @@ -676,10 +677,11 @@ func doList(args []string) error { type resp struct { Error string `json:"Error"` Tasks []struct { - Name string - Type kapacitor.TaskType - DBRPs []kapacitor.DBRP - Enabled bool + Name string + Type kapacitor.TaskType + DBRPs []kapacitor.DBRP + Enabled bool + Executing bool } `json:"Tasks"` } d := json.NewDecoder(r.Body) @@ -689,10 +691,10 @@ func doList(args []string) error { return errors.New(rp.Error) } - outFmt := "%-30s%-10v%-10v%s\n" - fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled", "Databases and Retention Policies") + outFmt := "%-30s%-10v%-10v%-10v%s\n" + fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled", "Executing", "Databases and Retention Policies") for _, t := range rp.Tasks { - fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled, t.DBRPs) + fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled, t.Executing, t.DBRPs) } case "recordings": @@ -749,7 +751,7 @@ func deleteUsage() { func doDelete(args []string) error { if len(args) < 2 { fmt.Fprintln(os.Stderr, "Must pass at least one task name or recording ID") - enableUsage() + deleteUsage() os.Exit(2) } diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index a0bcb54f5..b84f9d3a0 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -28,7 +28,6 @@ import ( "github.com/influxdb/kapacitor/services/slack" "github.com/influxdb/kapacitor/services/smtp" "github.com/influxdb/kapacitor/services/stats" - "github.com/influxdb/kapacitor/services/streamer" "github.com/influxdb/kapacitor/services/task_store" "github.com/influxdb/kapacitor/services/udp" "github.com/influxdb/kapacitor/services/victorops" @@ -75,14 +74,12 @@ type Server struct { dataDir string hostname string - err chan error - closing chan struct{} + err chan error TaskMaster *kapacitor.TaskMaster LogService logging.Interface HTTPDService *httpd.Service - Streamer *streamer.Service TaskStore *task_store.Service ReplayService *replay.Service InfluxDBService *influxdb.Service @@ -110,7 +107,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* dataDir: c.DataDir, hostname: c.Hostname, err: make(chan error), - closing: make(chan struct{}), LogService: logService, MetaStore: &metastore{}, QueryExecutor: &queryexecutor{}, @@ -125,7 +121,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* } // Append Kapacitor services. - s.appendStreamerService() s.appendSMTPService(c.SMTP) s.appendHTTPDService(c.HTTP) s.appendInfluxDBService(c.InfluxDB, c.Hostname) @@ -156,14 +151,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* return s, nil } -func (s *Server) appendStreamerService() { - l := s.LogService.NewLogger("[streamer] ", log.LstdFlags) - srv := streamer.NewService(l) - srv.StreamCollector = s.TaskMaster.Stream - - s.Streamer = srv - s.Services = append(s.Services, srv) -} func (s *Server) appendSMTPService(c smtp.Config) { if c.Enabled { @@ -179,7 +166,7 @@ func (s *Server) appendInfluxDBService(c influxdb.Config, hostname string) { if c.Enabled { l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags) srv := influxdb.NewService(c, hostname, l) - srv.PointsWriter = s.Streamer + srv.PointsWriter = s.TaskMaster srv.LogService = s.LogService s.InfluxDBService = srv @@ -193,7 +180,7 @@ func (s *Server) appendHTTPDService(c httpd.Config) { srv := httpd.NewService(c, l) srv.Handler.MetaStore = s.MetaStore - srv.Handler.PointsWriter = s.Streamer + srv.Handler.PointsWriter = s.TaskMaster srv.Handler.Version = s.buildInfo.Version s.HTTPDService = srv @@ -262,7 +249,7 @@ func (s *Server) appendCollectdService(c collectd.Config) { srv := collectd.NewService(c) srv.SetLogger(l) srv.MetaStore = s.MetaStore - srv.PointsWriter = s.Streamer + srv.PointsWriter = s.TaskMaster s.Services = append(s.Services, srv) } @@ -276,7 +263,7 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) error { return err } srv.SetLogger(l) - srv.PointsWriter = s.Streamer + srv.PointsWriter = s.TaskMaster srv.MetaStore = s.MetaStore s.Services = append(s.Services, srv) return nil @@ -293,7 +280,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { } srv.SetLogger(l) - srv.PointsWriter = s.Streamer + srv.PointsWriter = s.TaskMaster srv.MetaStore = s.MetaStore s.Services = append(s.Services, srv) return nil @@ -305,7 +292,7 @@ func (s *Server) appendUDPService(c udp.Config) { } l := s.LogService.NewLogger("[udp] ", log.LstdFlags) srv := udp.NewService(c, l) - srv.PointsWriter = s.Streamer + srv.PointsWriter = s.TaskMaster s.Services = append(s.Services, srv) } @@ -313,7 +300,7 @@ func (s *Server) appendStatsService(c stats.Config) { if c.Enabled { l := s.LogService.NewLogger("[stats] ", log.LstdFlags) srv := stats.NewService(c, l) - srv.StreamCollector = s.TaskMaster.Stream + srv.TaskMaster = s.TaskMaster s.Services = append(s.Services, srv) } @@ -389,8 +376,7 @@ func (s *Server) Close() error { s.Logger.Printf("D! closed service: %T", service) } - close(s.closing) - return nil + return s.TaskMaster.Close() } func (s *Server) setupIDs() error { diff --git a/cmd/kapacitord/run/server_helper_test.go b/cmd/kapacitord/run/server_helper_test.go index 5a04f73ee..c3518eb8b 100644 --- a/cmd/kapacitord/run/server_helper_test.go +++ b/cmd/kapacitord/run/server_helper_test.go @@ -39,7 +39,10 @@ func NewServer(c *run.Config) *Server { Branch: "testBranch", } ls := &LogService{} - srv, _ := run.NewServer(c, buildInfo, ls) + srv, err := run.NewServer(c, buildInfo, ls) + if err != nil { + panic(err) + } s := Server{ Server: srv, Config: c, diff --git a/cmd/kapacitord/run/server_test.go b/cmd/kapacitord/run/server_test.go index 17efa4e44..7eb556d67 100644 --- a/cmd/kapacitord/run/server_test.go +++ b/cmd/kapacitord/run/server_test.go @@ -14,7 +14,6 @@ import ( ) func TestServer_Ping(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() r, err := s.HTTPGet(s.URL() + "/api/v1/ping") @@ -27,7 +26,6 @@ func TestServer_Ping(t *testing.T) { } func TestServer_Version(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() resp, err := http.Get(s.URL() + "/api/v1/ping") @@ -42,7 +40,6 @@ func TestServer_Version(t *testing.T) { } func TestServer_DefineTask(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() @@ -97,7 +94,6 @@ func TestServer_DefineTask(t *testing.T) { } func TestServer_EnableTask(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() @@ -160,7 +156,6 @@ func TestServer_EnableTask(t *testing.T) { } func TestServer_DisableTask(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() @@ -231,7 +226,6 @@ func TestServer_DisableTask(t *testing.T) { } func TestServer_DeleteTask(t *testing.T) { - t.Parallel() s := OpenDefaultServer() defer s.Close() @@ -268,12 +262,10 @@ func TestServer_DeleteTask(t *testing.T) { } func TestServer_StreamTask(t *testing.T) { - t.Skip() // Need to figure out why its taking so long to enable the task. - t.Parallel() s := OpenDefaultServer() defer s.Close() - name := "testTaskName" + name := "testStreamTask" ttype := "stream" dbrps := []kapacitor.DBRP{{ Database: "mydb", @@ -344,8 +336,6 @@ test value=1 0000000011 } func TestServer_BatchTask(t *testing.T) { - t.Skip() // Need to figure out why its taking so long to enable the task. - t.Parallel() c := NewConfig() c.InfluxDB.Enabled = true count := 0 @@ -377,7 +367,7 @@ func TestServer_BatchTask(t *testing.T) { s := OpenServer(c) defer s.Close() - name := "testTaskName" + name := "testBatchTask" ttype := "batch" dbrps := []kapacitor.DBRP{{ Database: "mydb", @@ -416,7 +406,6 @@ batch if err != nil { t.Error(err) } - r, err = s.DisableTask(name) if err != nil { t.Fatal(err) diff --git a/edge.go b/edge.go index 4ddb72d20..79e4446b6 100644 --- a/edge.go +++ b/edge.go @@ -5,7 +5,6 @@ import ( "expvar" "fmt" "log" - "sync" "github.com/influxdb/kapacitor/models" "github.com/influxdb/kapacitor/pipeline" @@ -17,6 +16,8 @@ const ( statEmitted = "emitted" ) +var ErrAborted = errors.New("edged aborted") + type StreamCollector interface { CollectPoint(models.Point) error Close() @@ -33,8 +34,7 @@ type Edge struct { reduce chan *MapResult logger *log.Logger - closed bool - mu sync.Mutex + aborted chan struct{} statMap *expvar.Map } @@ -48,7 +48,7 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer sm := NewStatistics("edges", tags) sm.Add(statCollected, 0) sm.Add(statEmitted, 0) - e := &Edge{statMap: sm} + e := &Edge{statMap: sm, aborted: make(chan struct{})} name := fmt.Sprintf("%s|%s->%s", taskName, parentName, childName) e.logger = logService.NewLogger(fmt.Sprintf("[edge:%s] ", name), log.LstdFlags) switch t { @@ -62,13 +62,9 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer return e } +// Close the edge, this can only be called after all +// collect calls to the edge have finished. func (e *Edge) Close() { - e.mu.Lock() - defer e.mu.Unlock() - if e.closed { - return - } - e.closed = true e.logger.Printf( "I! closing c: %s e: %s\n", e.statMap.Get(statCollected), @@ -85,6 +81,17 @@ func (e *Edge) Close() { } } +// Abort all next and collect calls. +// Items in flight may or may not be processed. +func (e *Edge) Abort() { + close(e.aborted) + e.logger.Printf( + "I! aborting c: %s e: %s\n", + e.statMap.Get(statCollected), + e.statMap.Get(statEmitted), + ) +} + func (e *Edge) Next() (p models.PointInterface, ok bool) { if e.stream != nil { return e.NextPoint() @@ -101,9 +108,12 @@ func (e *Edge) NextPoint() (p models.Point, ok bool) { e.statMap.Get(statEmitted), ) } - p, ok = <-e.stream - if ok { - e.statMap.Add(statEmitted, 1) + select { + case <-e.aborted: + case p, ok = <-e.stream: + if ok { + e.statMap.Add(statEmitted, 1) + } } return } @@ -117,9 +127,12 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) { e.statMap.Get(statEmitted), ) } - b, ok = <-e.batch - if ok { - e.statMap.Add(statEmitted, 1) + select { + case <-e.aborted: + case b, ok = <-e.batch: + if ok { + e.statMap.Add(statEmitted, 1) + } } return } @@ -133,28 +146,17 @@ func (e *Edge) NextMaps() (m *MapResult, ok bool) { e.statMap.Get(statEmitted), ) } - m, ok = <-e.reduce - if ok { - e.statMap.Add(statEmitted, 1) - } - return -} - -func (e *Edge) recover(errp *error) { - if r := recover(); r != nil { - msg := fmt.Sprintf("%s", r) - if msg == "send on closed channel" { - *errp = errors.New(msg) - } else { - panic(r) + select { + case <-e.aborted: + case m, ok = <-e.reduce: + if ok { + e.statMap.Add(statEmitted, 1) } } + return } -func (e *Edge) CollectPoint(p models.Point) (err error) { - e.mu.Lock() - defer e.mu.Unlock() - defer e.recover(&err) +func (e *Edge) CollectPoint(p models.Point) error { if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( @@ -164,14 +166,15 @@ func (e *Edge) CollectPoint(p models.Point) (err error) { ) } e.statMap.Add(statCollected, 1) - e.stream <- p - return + select { + case <-e.aborted: + return ErrAborted + case e.stream <- p: + return nil + } } -func (e *Edge) CollectBatch(b models.Batch) (err error) { - e.mu.Lock() - defer e.mu.Unlock() - defer e.recover(&err) +func (e *Edge) CollectBatch(b models.Batch) error { if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( @@ -181,14 +184,15 @@ func (e *Edge) CollectBatch(b models.Batch) (err error) { ) } e.statMap.Add(statCollected, 1) - e.batch <- b - return + select { + case <-e.aborted: + return ErrAborted + case e.batch <- b: + return nil + } } -func (e *Edge) CollectMaps(m *MapResult) (err error) { - e.mu.Lock() - defer e.mu.Unlock() - defer e.recover(&err) +func (e *Edge) CollectMaps(m *MapResult) error { if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( @@ -198,6 +202,10 @@ func (e *Edge) CollectMaps(m *MapResult) (err error) { ) } e.statMap.Add(statCollected, 1) - e.reduce <- m - return + select { + case <-e.aborted: + return ErrAborted + case e.reduce <- m: + return nil + } } diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index b0a29cbbe..eadd43aa7 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -58,36 +58,7 @@ batch }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_Derivative", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(21 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_Derivative", script, 21*time.Second, er) } func TestBatch_DerivativeUnit(t *testing.T) { @@ -134,36 +105,7 @@ batch }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_Derivative", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(21 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_Derivative", script, 21*time.Second, er) } func TestBatch_DerivativeN(t *testing.T) { @@ -178,7 +120,7 @@ batch .every(10s) .groupBy(time(2s)) .derivative('value') - .httpOut('TestBatch_Derivative') + .httpOut('TestBatch_DerivativeNN') ` er := kapacitor.Result{ @@ -209,37 +151,9 @@ batch }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_DerivativeNN", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(21 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_DerivativeNN", script, 21*time.Second, er) } + func TestBatch_DerivativeNN(t *testing.T) { var script = ` @@ -253,7 +167,7 @@ batch .groupBy(time(2s)) .derivative('value') .nonNegative() - .httpOut('TestBatch_Derivative') + .httpOut('TestBatch_DerivativeNN') ` er := kapacitor.Result{ @@ -280,36 +194,7 @@ batch }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_DerivativeNN", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(21 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_DerivativeNN", script, 21*time.Second, er) } func TestBatch_SimpleMR(t *testing.T) { @@ -364,36 +249,7 @@ batch }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_SimpleMR", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(30 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_SimpleMR") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er) } func TestBatch_Join(t *testing.T) { @@ -442,36 +298,7 @@ cpu0.join(cpu1) }, } - clock, et, errCh, tm := testBatcher(t, "TestBatch_Join", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(30 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestBatch_Join") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er) } // Helper test function for batcher @@ -520,8 +347,42 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex // Replay test data to executor batches := tm.BatchCollectors(name) - errCh := r.ReplayBatch(allData, batches, false) + replayErr := r.ReplayBatch(allData, batches, false) t.Log(string(et.Task.Dot())) - return r.Setter, et, errCh, tm + return r.Setter, et, replayErr, tm +} + +func testBatcherWithOutput( + t *testing.T, + name, + script string, + duration time.Duration, + er kapacitor.Result, + ignoreOrder ...bool, +) { + clock, et, replayErr, tm := testBatcher(t, name, script) + defer tm.Close() + + err := fastForwardTask(clock, et, replayErr, tm, duration) + if err != nil { + t.Error(err) + } + + // Get the result + output, err := et.GetOutput(name) + if err != nil { + t.Fatal(err) + } + + resp, err := http.Get(output.Endpoint()) + if err != nil { + t.Fatal(err) + } + + // Assert we got the expected result + result := kapacitor.ResultFromJSON(resp.Body) + if eq, msg := compareResults(er, result); !eq { + t.Error(msg) + } } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 96ca4affd..2dbbc693c 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -72,36 +72,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_Derivative", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_Derivative", script, 15*time.Second, er) } func TestStream_DerivativeUnit(t *testing.T) { @@ -131,36 +102,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_Derivative", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_Derivative", script, 15*time.Second, er) } func TestStream_DerivativeNN(t *testing.T) { @@ -174,7 +116,7 @@ stream .period(10s) .every(10s) .mapReduce(influxql.mean('value')) - .httpOut('TestStream_Derivative') + .httpOut('TestStream_DerivativeNN') ` er := kapacitor.Result{ Series: imodels.Rows{ @@ -190,36 +132,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_DerivativeNN", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_DerivativeNN", script, 15*time.Second, er) } func TestStream_DerivativeN(t *testing.T) { @@ -232,7 +145,7 @@ stream .period(10s) .every(10s) .mapReduce(influxql.mean('value')) - .httpOut('TestStream_Derivative') + .httpOut('TestStream_DerivativeNN') ` er := kapacitor.Result{ Series: imodels.Rows{ @@ -248,36 +161,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_DerivativeNN", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_Derivative") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_DerivativeNN", script, 15*time.Second, er) } func TestStream_Window(t *testing.T) { @@ -329,36 +213,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_Window", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_Window") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er) } func TestStream_SimpleMR(t *testing.T) { @@ -387,36 +242,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_SimpleMR", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("TestStream_SimpleMR") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er) } func TestStream_GroupBy(t *testing.T) { @@ -429,7 +255,7 @@ stream .period(10s) .every(10s) .mapReduce(influxql.sum('value')) - .httpOut('error_count') + .httpOut('TestStream_GroupBy') ` er := kapacitor.Result{ @@ -464,36 +290,7 @@ stream }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_GroupBy", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("error_count") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_GroupBy", script, 13*time.Second, er) } func TestStream_Join(t *testing.T) { @@ -523,7 +320,7 @@ errorCounts.join(viewCounts) .eval(lambda: "errors.sum" / "views.sum") .as('error_percent') .keep() - .httpOut('error_rate') + .httpOut('TestStream_Join') ` er := kapacitor.Result{ @@ -564,36 +361,7 @@ errorCounts.join(viewCounts) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_Join", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("error_rate") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResultsIgnoreSeriesOrder(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, true) } func TestStream_JoinTolerance(t *testing.T) { @@ -618,7 +386,7 @@ errorCounts.join(viewCounts) .every(10s) .mapReduce(influxql.mean('error_percent')) .as('error_percent') - .httpOut('error_rate') + .httpOut('TestStream_JoinTolerance') ` er := kapacitor.Result{ @@ -653,36 +421,7 @@ errorCounts.join(viewCounts) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_JoinTolerance", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("error_rate") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResultsIgnoreSeriesOrder(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_JoinTolerance", script, 13*time.Second, er, true) } func TestStream_JoinFill(t *testing.T) { @@ -705,7 +444,7 @@ errorCounts.join(viewCounts) .period(10s) .every(10s) .mapReduce(influxql.count('error_percent')) - .httpOut('error_rate') + .httpOut('TestStream_JoinFill') ` er := kapacitor.Result{ @@ -740,36 +479,7 @@ errorCounts.join(viewCounts) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_JoinFill", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("error_rate") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResultsIgnoreSeriesOrder(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_JoinFill", script, 13*time.Second, er, true) } func TestStream_JoinN(t *testing.T) { @@ -793,7 +503,7 @@ cpu.join(mem, disk) .period(10s) .every(10s) .mapReduce(influxql.count('cpu.value')) - .httpOut('all') + .httpOut('TestStream_JoinN') ` er := kapacitor.Result{ @@ -810,36 +520,7 @@ cpu.join(mem, disk) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_JoinN", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("all") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_JoinN", script, 15*time.Second, er) } func TestStream_Union(t *testing.T) { @@ -861,7 +542,7 @@ cpu.union(mem, disk) .period(10s) .every(10s) .mapReduce(influxql.count('value')) - .httpOut('all') + .httpOut('TestStream_Union') ` er := kapacitor.Result{ @@ -878,36 +559,7 @@ cpu.union(mem, disk) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_Union", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput("all") - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(er, result); !eq { - t.Error(msg) - } + testStreamerWithOutput(t, "TestStream_Union", script, 15*time.Second, er) } func TestStream_Aggregations(t *testing.T) { @@ -928,7 +580,7 @@ stream .every(10s) .mapReduce({{ .Method }}({{ .Args }})) {{ if .UsePointTimes }}.usePointTimes(){{ end }} - .httpOut('{{ .Method }}') + .httpOut('TestStream_Aggregations') ` endTime := time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC) testCases := []testCase{ @@ -1351,42 +1003,13 @@ stream tc.Args = "'value'" } tmpl.Execute(&script, tc) - clock, et, errCh, tm := testStreamer( + testStreamerWithOutput( t, "TestStream_Aggregations", string(script.Bytes()), + 13*time.Second, + tc.ER, ) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } - - // Get the result - output, err := et.GetOutput(tc.Method) - if err != nil { - t.Fatal(err) - } - - resp, err := http.Get(output.Endpoint()) - if err != nil { - t.Fatal(err) - } - - // Assert we got the expected result - result := kapacitor.ResultFromJSON(resp.Body) - if eq, msg := compareResults(tc.ER, result); !eq { - t.Error(tc.Method + ": " + msg) - } - - tm.StopTask(et.Task.Name) } } @@ -1463,19 +1086,7 @@ stream .post('` + ts.URL + `') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_Alert", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second) if requestCount != 1 { t.Errorf("got %v exp %v", requestCount, 1) @@ -1545,23 +1156,18 @@ stream .channel('#alerts') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_Alert", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script) defer tm.Close() + c := slack.NewConfig() c.URL = ts.URL + "/test/slack/url" c.Channel = "#channel" sl := slack.NewService(c, logService.NewLogger("[test_slack] ", log.LstdFlags)) tm.SlackService = sl - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) + err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second) + if err != nil { + t.Error(err) } if requestCount != 1 { @@ -1623,7 +1229,7 @@ stream .pagerDuty() ` - clock, et, errCh, tm := testStreamer(t, "TestStream_Alert", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script) defer tm.Close() c := pagerduty.NewConfig() c.URL = ts.URL @@ -1632,15 +1238,9 @@ stream pd.HTTPDService = tm.HTTPDService tm.PagerDutyService = pd - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) + err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second) + if err != nil { + t.Error(err) } if requestCount != 1 { @@ -1705,7 +1305,7 @@ stream .routingKey('test_key') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_Alert", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script) defer tm.Close() c := victorops.NewConfig() c.URL = ts.URL @@ -1714,15 +1314,9 @@ stream vo := victorops.NewService(c, logService.NewLogger("[test_vo] ", log.LstdFlags)) tm.VictorOpsService = vo - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) + err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second) + if err != nil { + t.Error(err) } if requestCount != 1 { @@ -1766,15 +1360,17 @@ stream .post('` + ts.URL + `') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_AlertSigma", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_AlertSigma", script) defer tm.Close() // Move time forward clock.Set(clock.Zero().Add(13 * time.Second)) // Wait till the replay has finished - if e := <-errCh; e != nil { + if e := <-replayErr; e != nil { t.Error(e) } + // We don't want anymore data for the task + tm.DelFork(et.Task.Name) // Wait till the task is finished if e := et.Err(); e != nil { t.Error(e) @@ -1810,19 +1406,7 @@ stream .post('` + ts.URL + `') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_AlertComplexWhere", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } + testStreamerNoOutput(t, "TestStream_AlertComplexWhere", script, 13*time.Second) if requestCount != 1 { t.Errorf("got %v exp %v", requestCount, 1) @@ -1845,19 +1429,7 @@ stream .post('` + ts.URL + `') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_AlertStateChangesOnly", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } + testStreamerNoOutput(t, "TestStream_AlertStateChangesOnly", script, 13*time.Second) // Only 4 points below 93 so 8 state changes. if requestCount != 8 { @@ -1885,19 +1457,7 @@ stream .post('` + ts.URL + `') ` - clock, et, errCh, tm := testStreamer(t, "TestStream_AlertFlapping", script) - defer tm.Close() - - // Move time forward - clock.Set(clock.Zero().Add(13 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) - } + testStreamerNoOutput(t, "TestStream_AlertFlapping", script, 13*time.Second) // Flapping detection should drop the last alerts. if requestCount != 9 { @@ -1947,20 +1507,15 @@ stream done <- err })) - clock, et, errCh, tm := testStreamer(t, "TestStream_InfluxDBOut", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_InfluxDBOut", script) tm.InfluxDBService = influxdb defer tm.Close() - // Move time forward - clock.Set(clock.Zero().Add(15 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) + err := fastForwardTask(clock, et, replayErr, tm, 15*time.Second) + if err != nil { + t.Error(err) } + // Wait till we received a request if e := <-done; e != nil { t.Error(e) @@ -2075,18 +1630,12 @@ topScores.sample(4s) }, } - clock, et, errCh, tm := testStreamer(t, "TestStream_TopSelector", script) + clock, et, replayErr, tm := testStreamer(t, "TestStream_TopSelector", script) defer tm.Close() - // Move time forward - clock.Set(clock.Zero().Add(10 * time.Second)) - // Wait till the replay has finished - if e := <-errCh; e != nil { - t.Error(e) - } - // Wait till the task is finished - if e := et.Err(); e != nil { - t.Error(e) + err := fastForwardTask(clock, et, replayErr, tm, 10*time.Second) + if err != nil { + t.Error(err) } // Get the result @@ -2125,7 +1674,16 @@ topScores.sample(4s) } // Helper test function for streamer -func testStreamer(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) { +func testStreamer( + t *testing.T, + name, + script string, +) ( + clock.Setter, + *kapacitor.ExecutingTask, + <-chan error, + *kapacitor.TaskMaster, +) { if testing.Verbose() { wlog.SetLevel(wlog.DEBUG) } else { @@ -2163,8 +1721,86 @@ func testStreamer(t *testing.T, name, script string) (clock.Setter, *kapacitor.E } // Replay test data to executor - errCh := r.ReplayStream(data, tm.Stream, false, "s") + stream, err := tm.Stream(name) + if err != nil { + t.Fatal(err) + } + replayErr := r.ReplayStream(data, stream, false, "s") t.Log(string(et.Task.Dot())) - return r.Setter, et, errCh, tm + return r.Setter, et, replayErr, tm +} + +func fastForwardTask( + clock clock.Setter, + et *kapacitor.ExecutingTask, + replayErr <-chan error, + tm *kapacitor.TaskMaster, + duration time.Duration, +) error { + // Move time forward + clock.Set(clock.Zero().Add(duration)) + // Wait till the replay has finished + if err := <-replayErr; err != nil { + return err + } + tm.Drain() + // Wait till the task is finished + if err := et.Err(); err != nil { + return err + } + return nil +} + +func testStreamerNoOutput( + t *testing.T, + name, + script string, + duration time.Duration, +) { + clock, et, replayErr, tm := testStreamer(t, name, script) + err := fastForwardTask(clock, et, replayErr, tm, duration) + if err != nil { + t.Error(err) + } + defer tm.Close() +} + +func testStreamerWithOutput( + t *testing.T, + name, + script string, + duration time.Duration, + er kapacitor.Result, + ignoreOrder ...bool, +) { + clock, et, replayErr, tm := testStreamer(t, name, script) + err := fastForwardTask(clock, et, replayErr, tm, duration) + if err != nil { + t.Error(err) + } + defer tm.Close() + + // Get the result + output, err := et.GetOutput(name) + if err != nil { + t.Fatal(err) + } + + resp, err := http.Get(output.Endpoint()) + if err != nil { + t.Fatal(err) + } + + // Assert we got the expected result + result := kapacitor.ResultFromJSON(resp.Body) + if len(ignoreOrder) > 0 && ignoreOrder[0] { + if eq, msg := compareResultsIgnoreSeriesOrder(er, result); !eq { + t.Error(msg) + } + } else { + if eq, msg := compareResults(er, result); !eq { + t.Error(msg) + } + } } diff --git a/map_reduce.go b/map_reduce.go index 57d50ada8..724d932e5 100644 --- a/map_reduce.go +++ b/map_reduce.go @@ -205,6 +205,5 @@ func (r *ReduceNode) runReduce() error { } } - r.closeChildEdges() return nil } diff --git a/node.go b/node.go index 263aaecd3..f64add747 100644 --- a/node.go +++ b/node.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "runtime" + "sync" "github.com/influxdb/kapacitor/pipeline" ) @@ -30,30 +31,34 @@ type Node interface { // close children edges closeChildEdges() - closeParentEdges() + // abort parent edges + abortParentEdges() } //implementation of Node type node struct { pipeline.Node - et *ExecutingTask - parents []Node - children []Node - runF func() error - stopF func() - errCh chan error - ins []*Edge - outs []*Edge - logger *log.Logger + et *ExecutingTask + parents []Node + children []Node + runF func() error + stopF func() + errCh chan error + err error + finishedMu sync.Mutex + finished bool + ins []*Edge + outs []*Edge + logger *log.Logger } func (n *node) addParentEdge(e *Edge) { n.ins = append(n.ins, e) } -func (n *node) closeParentEdges() { +func (n *node) abortParentEdges() { for _, in := range n.ins { - in.Close() + in.Abort() } } @@ -68,16 +73,16 @@ func (n *node) start() { defer func() { // Always close children edges n.closeChildEdges() - // Handle panic in runF - r := recover() - if r != nil { - trace := make([]byte, 512) - n := runtime.Stack(trace, false) - err = fmt.Errorf("%s: Trace:%s", r, string(trace[:n])) - } // Propogate error up if err != nil { - n.closeParentEdges() + // Handle panic in runF + r := recover() + if r != nil { + trace := make([]byte, 512) + n := runtime.Stack(trace, false) + err = fmt.Errorf("%s: Trace:%s", r, string(trace[:n])) + } + n.abortParentEdges() n.logger.Println("E!", err) } n.errCh <- err @@ -91,11 +96,17 @@ func (n *node) stop() { if n.stopF != nil { n.stopF() } - n.closeChildEdges() + } func (n *node) Err() error { - return <-n.errCh + n.finishedMu.Lock() + defer n.finishedMu.Unlock() + if !n.finished { + n.finished = true + n.err = <-n.errCh + } + return n.err } func (n *node) addChild(c Node) (*Edge, error) { diff --git a/replay.go b/replay.go index f687055dc..ce03aa5fd 100644 --- a/replay.go +++ b/replay.go @@ -29,7 +29,7 @@ func NewReplay(c clock.Clock) *Replay { // Replay a data set against an executor. func (r *Replay) ReplayStream(data io.ReadCloser, stream StreamCollector, recTime bool, precision string) <-chan error { src := newReplayStreamSource(data, r.clck) - src.replayStream(stream, recTime, precision) + go src.replayStream(stream, recTime, precision) return src.Err() } @@ -54,62 +54,60 @@ func (r *replayStreamSource) Err() <-chan error { } func (r *replayStreamSource) replayStream(stream StreamCollector, recTime bool, precision string) { - go func() { - defer stream.Close() - defer r.data.Close() - start := time.Time{} - var diff time.Duration - zero := r.clck.Zero() - for r.in.Scan() { - db := r.in.Text() - if !r.in.Scan() { - r.err <- fmt.Errorf("invalid replay file format, expected another line") - return - } - rp := r.in.Text() - if !r.in.Scan() { - r.err <- fmt.Errorf("invalid replay file format, expected another line") - return - } - points, err := dbmodels.ParsePointsWithPrecision( - r.in.Bytes(), - zero, - precision, - ) - if err != nil { - r.err <- err - return - } - if start.IsZero() { - start = points[0].Time() - diff = zero.Sub(start) - } - var t time.Time - waitTime := points[0].Time().Add(diff).UTC() - if !recTime { - t = waitTime - } else { - t = points[0].Time().UTC() - } - mp := points[0] - p := models.Point{ - Database: db, - RetentionPolicy: rp, - Name: mp.Name(), - Group: models.NilGroup, - Tags: models.Tags(mp.Tags()), - Fields: models.Fields(mp.Fields()), - Time: t, - } - r.clck.Until(waitTime) - err = stream.CollectPoint(p) - if err != nil { - r.err <- err - return - } + defer stream.Close() + defer r.data.Close() + start := time.Time{} + var diff time.Duration + zero := r.clck.Zero() + for r.in.Scan() { + db := r.in.Text() + if !r.in.Scan() { + r.err <- fmt.Errorf("invalid replay file format, expected another line") + return } - r.err <- r.in.Err() - }() + rp := r.in.Text() + if !r.in.Scan() { + r.err <- fmt.Errorf("invalid replay file format, expected another line") + return + } + points, err := dbmodels.ParsePointsWithPrecision( + r.in.Bytes(), + zero, + precision, + ) + if err != nil { + r.err <- err + return + } + if start.IsZero() { + start = points[0].Time() + diff = zero.Sub(start) + } + var t time.Time + waitTime := points[0].Time().Add(diff).UTC() + if !recTime { + t = waitTime + } else { + t = points[0].Time().UTC() + } + mp := points[0] + p := models.Point{ + Database: db, + RetentionPolicy: rp, + Name: mp.Name(), + Group: models.NilGroup, + Tags: models.Tags(mp.Tags()), + Fields: models.Fields(mp.Fields()), + Time: t, + } + r.clck.Until(waitTime) + err = stream.CollectPoint(p) + if err != nil { + r.err <- err + return + } + } + r.err <- r.in.Err() } // Replay a data set against an executor. diff --git a/services/replay/service.go b/services/replay/service.go index 0934f8298..c67fb8183 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -44,9 +44,10 @@ type Service struct { NewClient() (*client.Client, error) } TaskMaster interface { - NewFork(name string, dbrps []kapacitor.DBRP) *kapacitor.Edge + NewFork(name string, dbrps []kapacitor.DBRP) (*kapacitor.Edge, error) DelFork(name string) New() *kapacitor.TaskMaster + Stream(name string) (kapacitor.StreamCollector, error) } logger *log.Logger @@ -182,7 +183,12 @@ func (r *Service) handleReplay(w http.ResponseWriter, req *http.Request) { httpd.HttpError(w, "replay find: "+err.Error(), true, http.StatusNotFound) return } - replayC = replay.ReplayStream(f, tm.Stream, recTime, precision) + stream, err := tm.Stream(id) + if err != nil { + httpd.HttpError(w, "stream start: "+err.Error(), true, http.StatusInternalServerError) + return + } + replayC = replay.ReplayStream(f, stream, recTime, precision) case kapacitor.BatchTask: fs, err := r.FindBatchRecording(id) if err != nil { @@ -507,7 +513,10 @@ func (s streamWriter) Close() error { // Record the stream for a duration func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapacitor.DBRP) error { - e := r.TaskMaster.NewFork(rid.String(), dbrps) + e, err := r.TaskMaster.NewFork(rid.String(), dbrps) + if err != nil { + return err + } sw, err := r.newStreamWriter(rid) if err != nil { return err @@ -522,7 +531,6 @@ func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapac }() time.Sleep(dur) done = true - e.Close() r.TaskMaster.DelFork(rid.String()) return nil } diff --git a/services/stats/service.go b/services/stats/service.go index fc363231a..4634b2096 100644 --- a/services/stats/service.go +++ b/services/stats/service.go @@ -36,10 +36,12 @@ import ( // Internal stats come from running tasks and other // services running within Kapacitor. type Service struct { - StreamCollector interface { - CollectPoint(models.Point) error + TaskMaster interface { + Stream(name string) (kapacitor.StreamCollector, error) } + stream kapacitor.StreamCollector + interval time.Duration db string rp string @@ -61,15 +63,19 @@ func NewService(c Config, l *log.Logger) *Service { } } -func (s *Service) Open() error { +func (s *Service) Open() (err error) { s.mu.Lock() defer s.mu.Unlock() + s.stream, err = s.TaskMaster.Stream("stats") + if err != nil { + return + } s.open = true s.closing = make(chan struct{}) s.wg.Add(1) go s.sendStats() s.logger.Println("I! opened service") - return nil + return } func (s *Service) Close() error { @@ -81,6 +87,7 @@ func (s *Service) Close() error { s.open = false close(s.closing) s.wg.Wait() + s.stream.Close() s.logger.Println("I! closed service") return nil } @@ -116,6 +123,6 @@ func (s *Service) reportStats() { Time: now, Fields: models.Fields(stat.Values), } - s.StreamCollector.CollectPoint(p) + s.stream.CollectPoint(p) } } diff --git a/services/streamer/service.go b/services/streamer/service.go deleted file mode 100644 index d15a72556..000000000 --- a/services/streamer/service.go +++ /dev/null @@ -1,49 +0,0 @@ -package streamer - -import ( - "log" - - "github.com/influxdb/influxdb/cluster" - "github.com/influxdb/kapacitor/models" -) - -type Service struct { - StreamCollector interface { - CollectPoint(models.Point) error - } - logger *log.Logger -} - -func NewService(l *log.Logger) *Service { - return &Service{ - logger: l, - } -} - -func (s *Service) Open() error { - return nil -} - -func (s *Service) Close() error { - return nil -} - -func (s *Service) WritePoints(pts *cluster.WritePointsRequest) (err error) { - for _, mp := range pts.Points { - p := models.Point{ - Database: pts.Database, - RetentionPolicy: pts.RetentionPolicy, - Name: mp.Name(), - Group: models.NilGroup, - Tags: models.Tags(mp.Tags()), - Fields: models.Fields(mp.Fields()), - Time: mp.Time(), - } - err = s.StreamCollector.CollectPoint(p) - if err != nil { - s.logger.Println("E!", err) - return - } - } - return -} diff --git a/services/task_store/service.go b/services/task_store/service.go index 39a20198b..17277b56d 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -35,7 +35,8 @@ type Service struct { } TaskMaster interface { StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error) - StopTask(name string) + StopTask(name string) error + IsExecuting(name string) bool } logger *log.Logger } @@ -187,6 +188,7 @@ type TaskInfo struct { TICKscript string Dot string Enabled bool + Executing bool Error string } @@ -203,7 +205,7 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) { return } - errMsg := "" + errMsg := raw.Error dot := "" task, err := ts.Load(name) if err == nil { @@ -219,6 +221,7 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) { TICKscript: raw.TICKscript, Dot: dot, Enabled: ts.IsEnabled(name), + Executing: ts.TaskMaster.IsExecuting(name), Error: errMsg, } @@ -246,10 +249,16 @@ func (ts *Service) handleTasks(w http.ResponseWriter, r *http.Request) { } type rawTask struct { - Name string + // The name of the task. + Name string + // The TICKscript for the task. TICKscript string - Type kapacitor.TaskType - DBRPs []kapacitor.DBRP + // Last error the task had either while defining or executing. + Error string + // The task type (stream|batch). + Type kapacitor.TaskType + // The DBs and RPs the task is allowed to access. + DBRPs []kapacitor.DBRP } func (ts *Service) handleSave(w http.ResponseWriter, r *http.Request) { @@ -469,9 +478,13 @@ func (ts *Service) Enable(name string) error { } func (ts *Service) StartTask(t *kapacitor.Task) error { + // Starting task, remove last error + ts.SaveLastError(t.Name, "") + // Start the task et, err := ts.TaskMaster.StartTask(t) if err != nil { + ts.SaveLastError(et.Task.Name, err.Error()) return err } @@ -479,9 +492,41 @@ func (ts *Service) StartTask(t *kapacitor.Task) error { if t.Type == kapacitor.BatchTask { err := et.StartBatching() if err != nil { + ts.SaveLastError(et.Task.Name, err.Error()) return err } } + + go func() { + // Wait for task to finish + err := et.Err() + // Stop task + ts.TaskMaster.StopTask(et.Task.Name) + + if err != nil { + ts.logger.Printf("E! task %s finished with error: %s", et.Task.Name, err) + // Save last error from task. + err = ts.SaveLastError(et.Task.Name, err.Error()) + if err != nil { + ts.logger.Println("E! failed to save last error for task", et.Task.Name) + } + } + }() + return nil +} + +// Save last error from task. +func (ts *Service) SaveLastError(name string, errStr string) error { + + raw, err := ts.LoadRaw(name) + if err != nil { + return err + } + raw.Error = errStr + err = ts.Save(raw) + if err != nil { + return err + } return nil } @@ -502,15 +547,18 @@ func (ts *Service) Disable(name string) error { } return nil }) - ts.TaskMaster.StopTask(name) - return err + if err != nil { + return err + } + return ts.TaskMaster.StopTask(name) } type taskInfo struct { - Name string - Type kapacitor.TaskType - DBRPs []kapacitor.DBRP - Enabled bool + Name string + Type kapacitor.TaskType + DBRPs []kapacitor.DBRP + Enabled bool + Executing bool } func (ts *Service) IsEnabled(name string) (e bool) { @@ -541,10 +589,11 @@ func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) { enabled := eb != nil && eb.Get(k) != nil info := taskInfo{ - Name: t.Name, - Type: t.Type, - DBRPs: t.DBRPs, - Enabled: enabled, + Name: t.Name, + Type: t.Type, + DBRPs: t.DBRPs, + Enabled: enabled, + Executing: ts.TaskMaster.IsExecuting(t.Name), } taskInfos = append(taskInfos, info) return nil diff --git a/stream.go b/stream.go index 5cf501e5d..d00850351 100644 --- a/stream.go +++ b/stream.go @@ -1,9 +1,6 @@ package kapacitor import ( - "fmt" - - "github.com/influxdb/influxdb/influxql" "github.com/influxdb/kapacitor/models" "github.com/influxdb/kapacitor/pipeline" "github.com/influxdb/kapacitor/tick" @@ -35,21 +32,6 @@ func newStreamNode(et *ExecutingTask, n *pipeline.StreamNode) (*StreamNode, erro return sn, nil } -func parseFromClause(from string) (db, rp, mm string, err error) { - //create fake but complete query for parsing - query := "select v from " + from - s, err := influxql.ParseStatement(query) - if err != nil { - return "", "", "", err - } - if slct, ok := s.(*influxql.SelectStatement); ok && len(slct.Sources) == 1 { - if m, ok := slct.Sources[0].(*influxql.Measurement); ok { - return m.Database, m.RetentionPolicy, m.Name, nil - } - } - return "", "", "", fmt.Errorf("invalid from condition: %q", from) -} - func (s *StreamNode) runStream() error { for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() { diff --git a/task.go b/task.go index b0233b091..bc0f94595 100644 --- a/task.go +++ b/task.go @@ -102,7 +102,6 @@ func NewBatcher(name, script string, dbrps []DBRP) (*Task, error) { type ExecutingTask struct { tm *TaskMaster Task *Task - ins []*Edge source Node outputs map[string]Output // node lookup from pipeline.ID -> Node @@ -181,9 +180,8 @@ func (et *ExecutingTask) link() error { // Start the task. func (et *ExecutingTask) start(ins []*Edge) error { - et.ins = ins - for _, in := range et.ins { + for _, in := range ins { et.source.addParentEdge(in) } @@ -193,14 +191,16 @@ func (et *ExecutingTask) start(ins []*Edge) error { }) } -func (et *ExecutingTask) stop() { - et.rwalk(func(n Node) error { +func (et *ExecutingTask) stop() (err error) { + et.walk(func(n Node) error { n.stop() + e := n.Err() + if e != nil { + err = e + } return nil }) - for _, in := range et.ins { - in.Close() - } + return } var ErrWrongTaskType = errors.New("wrong task type") @@ -218,7 +218,7 @@ func (et *ExecutingTask) StartBatching() error { return err } - batcher.Start(et.ins) + batcher.Start() return nil } diff --git a/task_master.go b/task_master.go index c57b11185..88ac3db90 100644 --- a/task_master.go +++ b/task_master.go @@ -1,12 +1,14 @@ package kapacitor import ( + "errors" "fmt" "log" "sync" "time" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/kapacitor/models" "github.com/influxdb/kapacitor/pipeline" "github.com/influxdb/kapacitor/services/httpd" @@ -16,9 +18,11 @@ type LogService interface { NewLogger(prefix string, flag int) *log.Logger } +var ErrTaskMasterClosed = errors.New("TaskMaster is closed") +var ErrTaskMasterOpen = errors.New("TaskMaster is open") + // An execution framework for a set of tasks. type TaskMaster struct { - Stream StreamCollector HTTPDService interface { AddRoutes([]httpd.Route) error DelRoutes([]httpd.Route) @@ -45,8 +49,9 @@ type TaskMaster struct { } LogService LogService - // Incoming stream and forks - in *Edge + // Incoming streams + writePointsIn StreamCollector + // Forks of incoming streams forks map[string]fork // Set of incoming batches @@ -57,7 +62,10 @@ type TaskMaster struct { logger *log.Logger - mu sync.RWMutex + closed bool + drained bool + mu sync.RWMutex + wg sync.WaitGroup } // A fork of the main data stream filtered by a set of DBRPs @@ -68,15 +76,13 @@ type fork struct { // Create a new Executor with a given clock. func NewTaskMaster(l LogService) *TaskMaster { - src := newEdge("TASK_MASTER", "sources", "stream", pipeline.StreamEdge, l) return &TaskMaster{ - Stream: src, - in: src, forks: make(map[string]fork), batches: make(map[string][]BatchCollector), tasks: make(map[string]*ExecutingTask), LogService: l, logger: l.NewLogger("[task_master] ", log.LstdFlags), + closed: true, } } @@ -89,21 +95,62 @@ func (tm *TaskMaster) New() *TaskMaster { return n } -func (tm *TaskMaster) Open() error { - - go tm.runForking() - return nil +func (tm *TaskMaster) Open() (err error) { + tm.mu.Lock() + defer tm.mu.Unlock() + if !tm.closed { + return ErrTaskMasterOpen + } + tm.closed = false + tm.drained = false + tm.writePointsIn, err = tm.stream("write_points") + if err != nil { + tm.closed = true + return + } + tm.logger.Println("I! opened") + return } func (tm *TaskMaster) Close() error { - tm.in.Close() + tm.Drain() + tm.mu.Lock() + defer tm.mu.Unlock() + if tm.closed { + return ErrTaskMasterClosed + } + tm.closed = true for _, et := range tm.tasks { - tm.StopTask(et.Task.Name) + tm.stopTask(et.Task.Name) } + tm.logger.Println("I! closed") return nil } +func (tm *TaskMaster) Drain() { + tm.waitForForks() + tm.mu.Lock() + defer tm.mu.Unlock() + for name := range tm.forks { + tm.delFork(name) + } +} + +func (tm *TaskMaster) waitForForks() { + if tm.drained { + return + } + tm.drained = true + tm.writePointsIn.Close() + tm.wg.Wait() +} + func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) { + tm.mu.Lock() + defer tm.mu.Unlock() + if tm.closed { + return nil, errors.New("task master is closed cannot start a task") + } tm.logger.Println("D! Starting task:", t.Name) et, err := NewExecutingTask(tm, t) if err != nil { @@ -113,7 +160,11 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) { var ins []*Edge switch et.Task.Type { case StreamTask: - ins = []*Edge{tm.NewFork(et.Task.Name, et.Task.DBRPs)} + e, err := tm.newFork(et.Task.Name, et.Task.DBRPs) + if err != nil { + return nil, err + } + ins = []*Edge{e} case BatchTask: count, err := et.BatchCount() if err != nil { @@ -125,7 +176,6 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) { ins[i] = in tm.batches[t.Name] = append(tm.batches[t.Name], in) } - } err = et.start(ins) @@ -144,59 +194,126 @@ func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector { return tm.batches[name] } -func (tm *TaskMaster) StopTask(name string) { +func (tm *TaskMaster) StopTask(name string) error { + tm.mu.Lock() + defer tm.mu.Unlock() + return tm.stopTask(name) +} + +// internal stopTask function. The caller must have acquired +// the lock in order to call this function +func (tm *TaskMaster) stopTask(name string) (err error) { if et, ok := tm.tasks[name]; ok { delete(tm.tasks, name) if et.Task.Type == StreamTask { - tm.DelFork(et.Task.Name) + tm.delFork(name) + } + err = et.stop() + if err != nil { + tm.logger.Println("E! Stopped task:", name, err) + } else { + tm.logger.Println("I! Stopped task:", name) } - et.stop() - tm.logger.Println("I! Stopped task:", name) } + return } -func (tm *TaskMaster) runForking() { - for p, ok := tm.in.NextPoint(); ok; p, ok = tm.in.NextPoint() { - tm.forkPoint(p) - } +func (tm *TaskMaster) IsExecuting(name string) bool { tm.mu.RLock() defer tm.mu.RUnlock() - for _, fork := range tm.forks { - fork.Edge.Close() + _, executing := tm.tasks[name] + return executing +} + +func (tm *TaskMaster) Stream(name string) (StreamCollector, error) { + tm.mu.Lock() + defer tm.mu.Unlock() + return tm.stream(name) +} + +func (tm *TaskMaster) stream(name string) (StreamCollector, error) { + if tm.closed { + return nil, ErrTaskMasterClosed + } + in := newEdge("TASK_MASTER", name, "stream", pipeline.StreamEdge, tm.LogService) + tm.drained = false + tm.wg.Add(1) + go tm.runForking(in) + return in, nil +} + +func (tm *TaskMaster) runForking(in *Edge) { + defer tm.wg.Done() + for p, ok := in.NextPoint(); ok; p, ok = in.NextPoint() { + tm.forkPoint(p) } } func (tm *TaskMaster) forkPoint(p models.Point) { tm.mu.RLock() defer tm.mu.RUnlock() - for name, fork := range tm.forks { + for _, fork := range tm.forks { dbrp := DBRP{ Database: p.Database, RetentionPolicy: p.RetentionPolicy, } if fork.dbrps[dbrp] { - err := fork.Edge.CollectPoint(p) - if err != nil { - tm.StopTask(name) - } + fork.Edge.CollectPoint(p) } } } -func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) *Edge { +func (tm *TaskMaster) WritePoints(pts *cluster.WritePointsRequest) error { + tm.mu.RLock() + defer tm.mu.RUnlock() + if tm.closed { + return ErrTaskMasterClosed + } + for _, mp := range pts.Points { + p := models.Point{ + Database: pts.Database, + RetentionPolicy: pts.RetentionPolicy, + Name: mp.Name(), + Group: models.NilGroup, + Tags: models.Tags(mp.Tags()), + Fields: models.Fields(mp.Fields()), + Time: mp.Time(), + } + err := tm.writePointsIn.CollectPoint(p) + if err != nil { + return err + } + } + return nil +} + +func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error) { tm.mu.Lock() defer tm.mu.Unlock() + return tm.newFork(taskName, dbrps) +} + +// internal newFork, must have acquired lock before calling. +func (tm *TaskMaster) newFork(taskName string, dbrps []DBRP) (*Edge, error) { + if tm.closed { + return nil, ErrTaskMasterClosed + } e := newEdge(taskName, "stream", "stream0", pipeline.StreamEdge, tm.LogService) tm.forks[taskName] = fork{ Edge: e, dbrps: CreateDBRPMap(dbrps), } - return e + return e, nil } func (tm *TaskMaster) DelFork(name string) { tm.mu.Lock() defer tm.mu.Unlock() + tm.delFork(name) +} + +// internal delFork function, must have lock to call +func (tm *TaskMaster) delFork(name string) { fork := tm.forks[name] delete(tm.forks, name) if fork.Edge != nil {