Skip to content

Commit

Permalink
Forking tasks by measurement in order to improve performance
Browse files Browse the repository at this point in the history
* Add Measurements() method to tasks, for fetching StreamNode's measurements
* Forking by measurement

benchmark                              old ns/op        new ns/op
delta
Benchmark_T1000_P5000_Matches-4        10894368013      11337096195
+4.06%
Benchmark_T1000_P5000_NoMatches-4      11772957621      34196738
-99.71%
Benchmark_T100_P5000_Matches-4         1149160968       763011845
-33.60%
Benchmark_T100_P5000_NoMatches-4       1193448589       27129779
-97.73%
Benchmark_T10_P5000_Matches-4          189739122        146005592
-23.05%
Benchmark_T10_P5000_NoMatches-4        155396278        25019129
-83.90%
Benchmark_T10_P500_CountTask-4         20287106         21344565
+5.21%
Benchmark_T10_P50000_CountTask-4       1832485835       1809250661
-1.27%
Benchmark_T1000_P500-4                 2034353343       2270492406
+11.61%
Benchmark_T1000_P50000_CountTask-4     197003231081     211087893829
+7.15%

benchmark                              old allocs     new allocs
delta
Benchmark_T1000_P5000_Matches-4        69144          69120
-0.03%
Benchmark_T1000_P5000_NoMatches-4      69146          65077
-5.88%
Benchmark_T100_P5000_Matches-4         56523          56498
-0.04%
Benchmark_T100_P5000_NoMatches-4       56548          56077
-0.83%
Benchmark_T10_P5000_Matches-4          55226          55213
-0.02%
Benchmark_T10_P5000_NoMatches-4        55219          55164
-0.10%
Benchmark_T10_P500_CountTask-4         10940          10936
-0.04%
Benchmark_T10_P50000_CountTask-4       1154692        1154686
-0.00%
Benchmark_T1000_P500-4                 542557         544128
+0.29%
Benchmark_T1000_P50000_CountTask-4     61019034       61026894
+0.01%

benchmark                              old bytes      new bytes
delta
Benchmark_T1000_P5000_Matches-4        4477632        4476032
-0.04%
Benchmark_T1000_P5000_NoMatches-4      4476112        3959755
-11.54%
Benchmark_T100_P5000_Matches-4         3668616        3666976
-0.04%
Benchmark_T100_P5000_NoMatches-4       3671944        3614211
-1.57%
Benchmark_T10_P5000_Matches-4          3585190        3584455
-0.02%
Benchmark_T10_P5000_NoMatches-4        3584604        3578619
-0.17%
Benchmark_T10_P500_CountTask-4         1624756        1624478
-0.02%
Benchmark_T10_P50000_CountTask-4       114281120      114280576
-0.00%
Benchmark_T1000_P500-4                 128125336      128225272
+0.08%
Benchmark_T1000_P50000_CountTask-4     7625275048     7625793936
+0.01%
  • Loading branch information
yosiat committed Apr 5, 2016
1 parent 32d4d1b commit 2543c54
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Features

- [#360](https://github.com/influxdata/kapacitor/pull/360): Forking tasks by measurement in order to improve performance

### Bugfixes

## v0.12.0 [2016-04-04]
Expand Down
8 changes: 4 additions & 4 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Service struct {
NewNamedClient(name string) (client.Client, error)
}
TaskMaster interface {
NewFork(name string, dbrps []kapacitor.DBRP) (*kapacitor.Edge, error)
NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error)
DelFork(name string)
New() *kapacitor.TaskMaster
Stream(name string) (kapacitor.StreamCollector, error)
Expand Down Expand Up @@ -265,7 +265,7 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) {
}

doF = func() error {
err := r.doRecordStream(rid, dur, t.DBRPs, started)
err := r.doRecordStream(rid, dur, t.DBRPs, t.Measurements(), started)
if err != nil {
close(started)
}
Expand Down Expand Up @@ -606,8 +606,8 @@ func (s streamWriter) Close() error {
}

// Record the stream for a duration
func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapacitor.DBRP, started chan struct{}) error {
e, err := r.TaskMaster.NewFork(rid.String(), dbrps)
func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapacitor.DBRP, measurements []string, started chan struct{}) error {
e, err := r.TaskMaster.NewFork(rid.String(), dbrps, measurements)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ func (t *Task) Dot() []byte {
return t.Pipeline.Dot(t.Name)
}

// returns all the measurements from a StreamNode
func (t *Task) Measurements() []string {
measurements := make([]string, 0)

t.Pipeline.Walk(func(node pipeline.Node) error {
switch streamNode := node.(type) {
case *pipeline.StreamNode:
measurements = append(measurements, streamNode.Measurement)
}
return nil
})

return measurements
}

// ----------------------------------
// ExecutingTask

Expand Down
142 changes: 110 additions & 32 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,16 @@ type TaskMaster struct {

// Incoming streams
writePointsIn StreamCollector

// Forks of incoming streams
forks map[string]fork
// We are mapping from (db, rp, measurement) to map of task names to their edges
// The outer map (from dbrp&measurement) is for fast access on forkPoint
// While the inner map is for handling fork deletions better (see taskToForkKeys)
forks map[forkKey]map[string]*Edge

// Task to fork keys is map to help in deletes, in deletes
// we have only the task name, and they are called after the task is deleted from TaskMaster.tasks
taskToForkKeys map[string][]forkKey

// Set of incoming batches
batches map[string][]BatchCollector
Expand All @@ -118,22 +126,23 @@ type TaskMaster struct {
wg sync.WaitGroup
}

// A fork of the main data stream filtered by a set of DBRPs
type fork struct {
Edge *Edge
dbrps map[DBRP]bool
type forkKey struct {
Database string
RetentionPolicy string
Measurement string
}

// Create a new Executor with a given clock.
func NewTaskMaster(l LogService) *TaskMaster {
return &TaskMaster{
forks: make(map[string]fork),
batches: make(map[string][]BatchCollector),
tasks: make(map[string]*ExecutingTask),
LogService: l,
logger: l.NewLogger("[task_master] ", log.LstdFlags),
closed: true,
TimingService: noOpTimingService{},
forks: make(map[forkKey]map[string]*Edge),
taskToForkKeys: make(map[string][]forkKey),
batches: make(map[string][]BatchCollector),
tasks: make(map[string]*ExecutingTask),
LogService: l,
logger: l.NewLogger("[task_master] ", log.LstdFlags),
closed: true,
TimingService: noOpTimingService{},
}
}

Expand Down Expand Up @@ -202,7 +211,9 @@ func (tm *TaskMaster) Drain() {
tm.waitForForks()
tm.mu.Lock()
defer tm.mu.Unlock()
for name := range tm.forks {

// TODO(yosia): handle this thing ;)
for name, _ := range tm.taskToForkKeys {
tm.delFork(name)
}
}
Expand Down Expand Up @@ -295,7 +306,7 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) {
var ins []*Edge
switch et.Task.Type {
case StreamTask:
e, err := tm.newFork(et.Task.Name, et.Task.DBRPs)
e, err := tm.newFork(et.Task.Name, et.Task.DBRPs, et.Task.Measurements())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -416,14 +427,29 @@ func (tm *TaskMaster) runForking(in *Edge) {
func (tm *TaskMaster) forkPoint(p models.Point) {
tm.mu.RLock()
defer tm.mu.RUnlock()
for _, fork := range tm.forks {
dbrp := DBRP{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
}
if fork.dbrps[dbrp] {
fork.Edge.CollectPoint(p)
}

// Create the fork keys - which is (db, rp, measurement)
key := forkKey{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
Measurement: p.Name,
}

// If we have empty measurement in this db,rp we need to send it all
// the points
emptyMeasurementKey := forkKey{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
Measurement: "",
}

// Merge the results to the forks map
for _, edge := range tm.forks[key] {
edge.CollectPoint(p)
}

for _, edge := range tm.forks[emptyMeasurementKey] {
edge.CollectPoint(p)
}
}

Expand All @@ -449,22 +475,54 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL
return nil
}

func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error) {
func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP, measurements []string) (*Edge, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
return tm.newFork(taskName, dbrps)
return tm.newFork(taskName, dbrps, measurements)
}

func forkKeys(dbrps []DBRP, measurements []string) []forkKey {
keys := make([]forkKey, 0)

for _, dbrp := range dbrps {
for _, measurement := range measurements {
key := forkKey{
RetentionPolicy: dbrp.RetentionPolicy,
Database: dbrp.Database,
Measurement: measurement,
}

keys = append(keys, key)
}
}

return keys
}

// internal newFork, must have acquired lock before calling.
func (tm *TaskMaster) newFork(taskName string, dbrps []DBRP) (*Edge, error) {
func (tm *TaskMaster) newFork(taskName string, dbrps []DBRP, measurements []string) (*Edge, error) {
if tm.closed {
return nil, ErrTaskMasterClosed
}

e := newEdge(taskName, "stream", "srcstream0", pipeline.StreamEdge, defaultEdgeBufferSize, tm.LogService)
tm.forks[taskName] = fork{
Edge: e,
dbrps: CreateDBRPMap(dbrps),

for _, key := range forkKeys(dbrps, measurements) {
tm.taskToForkKeys[taskName] = append(tm.taskToForkKeys[taskName], key)

// Add the task to the tasksMap if it dosen't exists
tasksMap, ok := tm.forks[key]
if !ok {
tasksMap = make(map[string]*Edge, 0)
}

// Add the edge to task map
tasksMap[taskName] = e

// update the task map in the forks
tm.forks[key] = tasksMap
}

return e, nil
}

Expand All @@ -476,11 +534,31 @@ func (tm *TaskMaster) DelFork(name string) {

// internal delFork function, must have lock to call
func (tm *TaskMaster) delFork(name string) {
fork := tm.forks[name]
delete(tm.forks, name)
if fork.Edge != nil {
fork.Edge.Close()

// mark if we already closed the edge because the edge is replicated
// by it's fork keys (db,rp,measurement)
isEdgeClosed := false

// Find the fork keys
for _, key := range tm.taskToForkKeys[name] {

// check if the edge exists
edge, ok := tm.forks[key][name]
if ok {

// Only close the edge if we are already didn't closed it
if edge != nil && !isEdgeClosed {
isEdgeClosed = true
edge.Close()
}

// remove the task in fork map
delete(tm.forks[key], name)
}
}

// remove mapping from task name to it's keys
delete(tm.taskToForkKeys, name)
}

func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error) {
Expand Down

0 comments on commit 2543c54

Please sign in to comment.