Skip to content

Commit

Permalink
restructure edges in task master, abort vs close edge
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Dec 11, 2015
1 parent 18b2061 commit 080f0a2
Show file tree
Hide file tree
Showing 19 changed files with 648 additions and 1,029 deletions.
74 changes: 43 additions & 31 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (s *SourceBatchNode) Count() int {
return len(s.children)
}

func (s *SourceBatchNode) Start(collectors []*Edge) {
for i, b := range s.children {
b.(*BatchNode).Start(collectors[i])
func (s *SourceBatchNode) Start() {
for _, b := range s.children {
b.(*BatchNode).Start()
}
}

Expand All @@ -88,19 +88,18 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

type BatchNode struct {
node
b *pipeline.BatchNode
query *Query
ticker ticker
wg sync.WaitGroup
errC chan error
closing chan struct{}
b *pipeline.BatchNode
query *Query
ticker ticker
queryMu sync.Mutex
queryErr chan error
closing chan struct{}
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et},
b: n,
errC: make(chan error, 1),
closing: make(chan struct{}),
}
bn.node.runF = bn.runBatch
Expand Down Expand Up @@ -176,10 +175,12 @@ func (b *BatchNode) DBRPs() ([]DBRP, error) {
return b.query.DBRPs()
}

func (b *BatchNode) Start(batch BatchCollector) {
b.wg.Add(1)
func (b *BatchNode) Start() {
b.queryMu.Lock()
defer b.queryMu.Unlock()
b.queryErr = make(chan error, 1)
go func() {
b.errC <- b.doQuery(batch)
b.queryErr <- b.doQuery()
}()
}

Expand Down Expand Up @@ -209,9 +210,8 @@ func (b *BatchNode) Queries(start, stop time.Time) []string {
}

// Query InfluxDB and collect batches on batch collector.
func (b *BatchNode) doQuery(batch BatchCollector) error {
defer batch.Close()
defer b.wg.Done()
func (b *BatchNode) doQuery() error {
defer b.ins[0].Close()

if b.et.tm.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query")
Expand Down Expand Up @@ -257,24 +257,22 @@ func (b *BatchNode) doQuery(batch BatchCollector) error {
return err
}
for _, bch := range batches {
batch.CollectBatch(bch)
b.ins[0].CollectBatch(bch)
}
}
}
}
}

func (b *BatchNode) stopBatch() {
if b.ticker != nil {
b.ticker.Stop()
}
close(b.closing)
b.wg.Wait()
}

func (b *BatchNode) runBatch() error {
errC := make(chan error, 1)
go func() {
defer func() {
err := recover()
if err != nil {
errC <- fmt.Errorf("%v", err)
}
}()
for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() {
for _, child := range b.outs {
err := child.CollectBatch(bt)
Expand All @@ -286,13 +284,27 @@ func (b *BatchNode) runBatch() error {
}
errC <- nil
}()
// Wait for errors
select {
case err := <-b.errC:
return err
case err := <-errC:
return err
var queryErr error
b.queryMu.Lock()
if b.queryErr != nil {
b.queryMu.Unlock()
queryErr = <-b.queryErr
} else {
b.queryMu.Unlock()
}

err := <-errC
if queryErr != nil {
return queryErr
}
return err
}

func (b *BatchNode) stopBatch() {
if b.ticker != nil {
b.ticker.Stop()
}
close(b.closing)
}

type ticker interface {
Expand Down
4 changes: 2 additions & 2 deletions clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestClockUntilSleepFirst(t *testing.T) {
c.Set(zero.Add(10 * time.Microsecond))
select {
case <-done:
case <-time.After(10 * time.Millisecond):
case <-time.After(20 * time.Millisecond):
t.Fatal("expected return from c.Until")
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestClockUntilMultipleGos(t *testing.T) {
c.Set(now)
select {
case <-done:
case <-time.After(10 * time.Millisecond):
case <-time.After(20 * time.Millisecond):
t.Fatalf("expected return from c.Until i: %d", i)
}
}
Expand Down
18 changes: 10 additions & 8 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func doShow(args []string) error {
fmt.Println("Error:", ti.Error)
fmt.Println("Type:", ti.Type)
fmt.Println("Enabled:", ti.Enabled)
fmt.Println("Executing:", ti.Executing)
fmt.Println("Databases Retention Policies:", ti.DBRPs)
fmt.Printf("TICKscript:\n%s\n\n", ti.TICKscript)
fmt.Printf("DOT:\n%s\n", ti.Dot)
Expand Down Expand Up @@ -676,10 +677,11 @@ func doList(args []string) error {
type resp struct {
Error string `json:"Error"`
Tasks []struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
} `json:"Tasks"`
}
d := json.NewDecoder(r.Body)
Expand All @@ -689,10 +691,10 @@ func doList(args []string) error {
return errors.New(rp.Error)
}

outFmt := "%-30s%-10v%-10v%s\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled", "Databases and Retention Policies")
outFmt := "%-30s%-10v%-10v%-10v%s\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled", "Executing", "Databases and Retention Policies")
for _, t := range rp.Tasks {
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled, t.DBRPs)
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled, t.Executing, t.DBRPs)
}
case "recordings":

Expand Down Expand Up @@ -749,7 +751,7 @@ func deleteUsage() {
func doDelete(args []string) error {
if len(args) < 2 {
fmt.Fprintln(os.Stderr, "Must pass at least one task name or recording ID")
enableUsage()
deleteUsage()
os.Exit(2)
}

Expand Down
32 changes: 9 additions & 23 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/influxdb/kapacitor/services/slack"
"github.com/influxdb/kapacitor/services/smtp"
"github.com/influxdb/kapacitor/services/stats"
"github.com/influxdb/kapacitor/services/streamer"
"github.com/influxdb/kapacitor/services/task_store"
"github.com/influxdb/kapacitor/services/udp"
"github.com/influxdb/kapacitor/services/victorops"
Expand Down Expand Up @@ -75,14 +74,12 @@ type Server struct {
dataDir string
hostname string

err chan error
closing chan struct{}
err chan error

TaskMaster *kapacitor.TaskMaster

LogService logging.Interface
HTTPDService *httpd.Service
Streamer *streamer.Service
TaskStore *task_store.Service
ReplayService *replay.Service
InfluxDBService *influxdb.Service
Expand Down Expand Up @@ -110,7 +107,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
dataDir: c.DataDir,
hostname: c.Hostname,
err: make(chan error),
closing: make(chan struct{}),
LogService: logService,
MetaStore: &metastore{},
QueryExecutor: &queryexecutor{},
Expand All @@ -125,7 +121,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
}

// Append Kapacitor services.
s.appendStreamerService()
s.appendSMTPService(c.SMTP)
s.appendHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.Hostname)
Expand Down Expand Up @@ -156,14 +151,6 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*

return s, nil
}
func (s *Server) appendStreamerService() {
l := s.LogService.NewLogger("[streamer] ", log.LstdFlags)
srv := streamer.NewService(l)
srv.StreamCollector = s.TaskMaster.Stream

s.Streamer = srv
s.Services = append(s.Services, srv)
}

func (s *Server) appendSMTPService(c smtp.Config) {
if c.Enabled {
Expand All @@ -179,7 +166,7 @@ func (s *Server) appendInfluxDBService(c influxdb.Config, hostname string) {
if c.Enabled {
l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags)
srv := influxdb.NewService(c, hostname, l)
srv.PointsWriter = s.Streamer
srv.PointsWriter = s.TaskMaster
srv.LogService = s.LogService

s.InfluxDBService = srv
Expand All @@ -193,7 +180,7 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv := httpd.NewService(c, l)

srv.Handler.MetaStore = s.MetaStore
srv.Handler.PointsWriter = s.Streamer
srv.Handler.PointsWriter = s.TaskMaster
srv.Handler.Version = s.buildInfo.Version

s.HTTPDService = srv
Expand Down Expand Up @@ -262,7 +249,7 @@ func (s *Server) appendCollectdService(c collectd.Config) {
srv := collectd.NewService(c)
srv.SetLogger(l)
srv.MetaStore = s.MetaStore
srv.PointsWriter = s.Streamer
srv.PointsWriter = s.TaskMaster
s.Services = append(s.Services, srv)
}

Expand All @@ -276,7 +263,7 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
return err
}
srv.SetLogger(l)
srv.PointsWriter = s.Streamer
srv.PointsWriter = s.TaskMaster
srv.MetaStore = s.MetaStore
s.Services = append(s.Services, srv)
return nil
Expand All @@ -293,7 +280,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error {
}
srv.SetLogger(l)

srv.PointsWriter = s.Streamer
srv.PointsWriter = s.TaskMaster
srv.MetaStore = s.MetaStore
s.Services = append(s.Services, srv)
return nil
Expand All @@ -305,15 +292,15 @@ func (s *Server) appendUDPService(c udp.Config) {
}
l := s.LogService.NewLogger("[udp] ", log.LstdFlags)
srv := udp.NewService(c, l)
srv.PointsWriter = s.Streamer
srv.PointsWriter = s.TaskMaster
s.Services = append(s.Services, srv)
}

func (s *Server) appendStatsService(c stats.Config) {
if c.Enabled {
l := s.LogService.NewLogger("[stats] ", log.LstdFlags)
srv := stats.NewService(c, l)
srv.StreamCollector = s.TaskMaster.Stream
srv.TaskMaster = s.TaskMaster

s.Services = append(s.Services, srv)
}
Expand Down Expand Up @@ -389,8 +376,7 @@ func (s *Server) Close() error {
s.Logger.Printf("D! closed service: %T", service)
}

close(s.closing)
return nil
return s.TaskMaster.Close()
}

func (s *Server) setupIDs() error {
Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ func NewServer(c *run.Config) *Server {
Branch: "testBranch",
}
ls := &LogService{}
srv, _ := run.NewServer(c, buildInfo, ls)
srv, err := run.NewServer(c, buildInfo, ls)
if err != nil {
panic(err)
}
s := Server{
Server: srv,
Config: c,
Expand Down
Loading

0 comments on commit 080f0a2

Please sign in to comment.