diff --git a/CHANGELOG.md b/CHANGELOG.md index 3322e3f0a..f0705e805 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ ### Features +- [#360](https://github.com/influxdata/kapacitor/pull/360): Forking tasks by measurement in order to improve performance + ### Bugfixes ## v0.12.0 [2016-04-04] diff --git a/services/replay/service.go b/services/replay/service.go index ec31c07d5..5760dfd4a 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -46,7 +46,7 @@ type Service struct { NewNamedClient(name string) (client.Client, error) } TaskMaster interface { - NewFork(name string, dbrps []kapacitor.DBRP) (*kapacitor.Edge, error) + NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error) DelFork(name string) New() *kapacitor.TaskMaster Stream(name string) (kapacitor.StreamCollector, error) @@ -265,7 +265,7 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { } doF = func() error { - err := r.doRecordStream(rid, dur, t.DBRPs, started) + err := r.doRecordStream(rid, dur, t.DBRPs, t.Measurements(), started) if err != nil { close(started) } @@ -606,8 +606,8 @@ func (s streamWriter) Close() error { } // Record the stream for a duration -func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapacitor.DBRP, started chan struct{}) error { - e, err := r.TaskMaster.NewFork(rid.String(), dbrps) +func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapacitor.DBRP, measurements []string, started chan struct{}) error { + e, err := r.TaskMaster.NewFork(rid.String(), dbrps, measurements) if err != nil { return err } diff --git a/task.go b/task.go index ef219638f..34e3b3d32 100644 --- a/task.go +++ b/task.go @@ -61,6 +61,21 @@ func (t *Task) Dot() []byte { return t.Pipeline.Dot(t.Name) } +// returns all the measurements from a StreamNode +func (t *Task) Measurements() []string { + measurements := make([]string, 0) + + t.Pipeline.Walk(func(node pipeline.Node) error { + switch streamNode := node.(type) { + case *pipeline.StreamNode: + measurements = append(measurements, streamNode.Measurement) + } + return nil + }) + + return measurements +} + // ---------------------------------- // ExecutingTask diff --git a/task_master.go b/task_master.go index a94a4fa68..c70572e16 100644 --- a/task_master.go +++ b/task_master.go @@ -101,8 +101,16 @@ type TaskMaster struct { // Incoming streams writePointsIn StreamCollector + // Forks of incoming streams - forks map[string]fork + // We are mapping from (db, rp, measurement) to map of task names to their edges + // The outer map (from dbrp&measurement) is for fast access on forkPoint + // While the inner map is for handling fork deletions better (see taskToForkKeys) + forks map[forkKey]map[string]*Edge + + // Task to fork keys is map to help in deletes, in deletes + // we have only the task name, and they are called after the task is deleted from TaskMaster.tasks + taskToForkKeys map[string][]forkKey // Set of incoming batches batches map[string][]BatchCollector @@ -118,22 +126,23 @@ type TaskMaster struct { wg sync.WaitGroup } -// A fork of the main data stream filtered by a set of DBRPs -type fork struct { - Edge *Edge - dbrps map[DBRP]bool +type forkKey struct { + Database string + RetentionPolicy string + Measurement string } // Create a new Executor with a given clock. func NewTaskMaster(l LogService) *TaskMaster { return &TaskMaster{ - forks: make(map[string]fork), - batches: make(map[string][]BatchCollector), - tasks: make(map[string]*ExecutingTask), - LogService: l, - logger: l.NewLogger("[task_master] ", log.LstdFlags), - closed: true, - TimingService: noOpTimingService{}, + forks: make(map[forkKey]map[string]*Edge), + taskToForkKeys: make(map[string][]forkKey), + batches: make(map[string][]BatchCollector), + tasks: make(map[string]*ExecutingTask), + LogService: l, + logger: l.NewLogger("[task_master] ", log.LstdFlags), + closed: true, + TimingService: noOpTimingService{}, } } @@ -202,7 +211,9 @@ func (tm *TaskMaster) Drain() { tm.waitForForks() tm.mu.Lock() defer tm.mu.Unlock() - for name := range tm.forks { + + // TODO(yosia): handle this thing ;) + for name, _ := range tm.taskToForkKeys { tm.delFork(name) } } @@ -295,7 +306,7 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) { var ins []*Edge switch et.Task.Type { case StreamTask: - e, err := tm.newFork(et.Task.Name, et.Task.DBRPs) + e, err := tm.newFork(et.Task.Name, et.Task.DBRPs, et.Task.Measurements()) if err != nil { return nil, err } @@ -416,14 +427,29 @@ func (tm *TaskMaster) runForking(in *Edge) { func (tm *TaskMaster) forkPoint(p models.Point) { tm.mu.RLock() defer tm.mu.RUnlock() - for _, fork := range tm.forks { - dbrp := DBRP{ - Database: p.Database, - RetentionPolicy: p.RetentionPolicy, - } - if fork.dbrps[dbrp] { - fork.Edge.CollectPoint(p) - } + + // Create the fork keys - which is (db, rp, measurement) + key := forkKey{ + Database: p.Database, + RetentionPolicy: p.RetentionPolicy, + Measurement: p.Name, + } + + // If we have empty measurement in this db,rp we need to send it all + // the points + emptyMeasurementKey := forkKey{ + Database: p.Database, + RetentionPolicy: p.RetentionPolicy, + Measurement: "", + } + + // Merge the results to the forks map + for _, edge := range tm.forks[key] { + edge.CollectPoint(p) + } + + for _, edge := range tm.forks[emptyMeasurementKey] { + edge.CollectPoint(p) } } @@ -449,22 +475,54 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL return nil } -func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error) { +func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP, measurements []string) (*Edge, error) { tm.mu.Lock() defer tm.mu.Unlock() - return tm.newFork(taskName, dbrps) + return tm.newFork(taskName, dbrps, measurements) +} + +func forkKeys(dbrps []DBRP, measurements []string) []forkKey { + keys := make([]forkKey, 0) + + for _, dbrp := range dbrps { + for _, measurement := range measurements { + key := forkKey{ + RetentionPolicy: dbrp.RetentionPolicy, + Database: dbrp.Database, + Measurement: measurement, + } + + keys = append(keys, key) + } + } + + return keys } // internal newFork, must have acquired lock before calling. -func (tm *TaskMaster) newFork(taskName string, dbrps []DBRP) (*Edge, error) { +func (tm *TaskMaster) newFork(taskName string, dbrps []DBRP, measurements []string) (*Edge, error) { if tm.closed { return nil, ErrTaskMasterClosed } + e := newEdge(taskName, "stream", "srcstream0", pipeline.StreamEdge, defaultEdgeBufferSize, tm.LogService) - tm.forks[taskName] = fork{ - Edge: e, - dbrps: CreateDBRPMap(dbrps), + + for _, key := range forkKeys(dbrps, measurements) { + tm.taskToForkKeys[taskName] = append(tm.taskToForkKeys[taskName], key) + + // Add the task to the tasksMap if it dosen't exists + tasksMap, ok := tm.forks[key] + if !ok { + tasksMap = make(map[string]*Edge, 0) + } + + // Add the edge to task map + tasksMap[taskName] = e + + // update the task map in the forks + tm.forks[key] = tasksMap } + return e, nil } @@ -476,11 +534,31 @@ func (tm *TaskMaster) DelFork(name string) { // internal delFork function, must have lock to call func (tm *TaskMaster) delFork(name string) { - fork := tm.forks[name] - delete(tm.forks, name) - if fork.Edge != nil { - fork.Edge.Close() + + // mark if we already closed the edge because the edge is replicated + // by it's fork keys (db,rp,measurement) + isEdgeClosed := false + + // Find the fork keys + for _, key := range tm.taskToForkKeys[name] { + + // check if the edge exists + edge, ok := tm.forks[key][name] + if ok { + + // Only close the edge if we are already didn't closed it + if edge != nil && !isEdgeClosed { + isEdgeClosed = true + edge.Close() + } + + // remove the task in fork map + delete(tm.forks[key], name) + } } + + // remove mapping from task name to it's keys + delete(tm.taskToForkKeys, name) } func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error) {