Skip to content

Commit

Permalink
TaskStore changes:
Browse files Browse the repository at this point in the history
* Enabling and disabling tasks by glob, for example:

  `kapacitor enable window_*`

* Deleting tasks by glob

* Adding FindTasks method for finding tasks by predicate
  • Loading branch information
yosiat authored and Nathaniel Cook committed Apr 1, 2016
1 parent 4b1d37f commit bc3aa6d
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ format a TICKscript file according to a common standard.
- [#299](https://github.com/influxdata/kapacitor/issues/299): Changes TICKscript chaining method operators and adds `tickfmt` binary.
- [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases.
- [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions.
- [#381](https://github.com/influxdata/kapacitor/pull/381): Adding enable/disable/delete/reload tasks by glob.


### Bugfixes
Expand Down
45 changes: 44 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ func enableUsage() {
var u = `Usage: kapacitor enable [task name...]
Enable and start a task running from the live data.
For example:
You can enable by specific task name.
$ kapacitor enable cpu_alert
Or, you can enable by glob:
$ kapacitor enable *_alert
`
fmt.Fprintln(os.Stderr, u)
}
Expand Down Expand Up @@ -678,6 +688,16 @@ func disableUsage() {
var u = `Usage: kapacitor disable [task name...]
Disable and stop a task running.
For example:
You can disable by specific task name.
$ kapacitor disable cpu_alert
Or, you can disable by glob:
$ kapacitor disable *_alert
`
fmt.Fprintln(os.Stderr, u)
}
Expand Down Expand Up @@ -717,6 +737,16 @@ func reloadUsage() {
var u = `Usage: kapacitor reload [task name...]
Disable then enable a running task.
For example:
You can reload by specific task name.
$ kapacitor reload cpu_alert
Or, you can reload by glob:
$ kapacitor reload *_alert
`
fmt.Fprintln(os.Stderr, u)
}
Expand Down Expand Up @@ -874,7 +904,20 @@ func deleteUsage() {
Delete a task or recording.
If a task is enabled it will be disabled and then deleted.
If a task is enabled it will be disabled and then deleted,
For example:
You can delete task:
$ kapacitor delete tasks my_task
Or you can delete tasks by glob:
$ kapacitor delete tasks *_alert
You can delete recordings:
$ kapacitor delete recordings b0a2ba8a-aeeb-45ec-bef9-1a2939963586
`
fmt.Fprintln(os.Stderr, u)
}
Expand Down
152 changes: 135 additions & 17 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -523,9 +524,9 @@ func (ts *Service) Save(task *rawTask) error {
return err
}

func (ts *Service) Delete(name string) error {

func (ts *Service) deleteTask(name string) error {
ts.TaskMaster.StopTask(name)

return ts.db.Update(func(tx *bolt.Tx) error {
tb := tx.Bucket(tasksBucket)
if tb != nil {
Expand All @@ -543,6 +544,30 @@ func (ts *Service) Delete(name string) error {
})
}

func (ts *Service) Delete(pattern string) error {
rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) {
matched, err := filepath.Match(pattern, taskName)
if err != nil {
return false, err
}

return matched, nil
})

if err != nil {
return nil
}

for _, rawTask := range rawTasks {
err = ts.deleteTask(rawTask.Name)
if err != nil {
return err
}
}

return nil
}

func (ts *Service) LoadRaw(name string) (*rawTask, error) {
var data []byte
err := ts.db.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -577,6 +602,11 @@ func (ts *Service) Load(name string) (*kapacitor.Task, error) {
if err != nil {
return nil, err
}

return ts.CreateTaskFromRaw(task)
}

func (ts *Service) CreateTaskFromRaw(task *rawTask) (*kapacitor.Task, error) {
return ts.TaskMaster.NewTask(task.Name,
task.TICKscript,
task.Type,
Expand All @@ -585,22 +615,22 @@ func (ts *Service) Load(name string) (*kapacitor.Task, error) {
)
}

func (ts *Service) Enable(name string) error {
// Load the task
t, err := ts.Load(name)
func (ts *Service) enableRawTask(rawTask *rawTask) error {
t, err := ts.CreateTaskFromRaw(rawTask)
if err != nil {
return err
}

var enabled bool
// Save the enabled state
var enabled bool

err = ts.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(enabledBucket)
if err != nil {
return err
}
enabled = b.Get([]byte(name)) != nil
err = b.Put([]byte(name), []byte{})
enabled = b.Get([]byte(t.Name)) != nil
err = b.Put([]byte(t.Name), []byte{})
if err != nil {
return err
}
Expand All @@ -619,6 +649,31 @@ func (ts *Service) Enable(name string) error {
return nil
}

func (ts *Service) Enable(pattern string) error {
// Find the matching tasks
rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) {
matched, err := filepath.Match(pattern, taskName)
if err != nil {
return false, err
}

return matched, nil
})

if err != nil {
return nil
}

for _, rawTask := range rawTasks {
err = ts.enableRawTask(rawTask)
if err != nil {
return nil
}
}

return nil
}

func (ts *Service) StartTask(t *kapacitor.Task) error {
// Starting task, remove last error
ts.SaveLastError(t.Name, "")
Expand Down Expand Up @@ -673,27 +728,52 @@ func (ts *Service) SaveLastError(name string, errStr string) error {
return nil
}

func (ts *Service) Disable(name string) error {
func (ts *Service) Disable(pattern string) error {
// Find the matching tasks
rawTasks, err := ts.FindTasks(func(taskName string) (bool, error) {
matched, err := filepath.Match(pattern, taskName)
if err != nil {
return false, err
}

return matched, nil
})

if err != nil {
return nil
}

// Delete the enabled state
err := ts.db.Update(func(tx *bolt.Tx) error {
err = ts.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(enabledBucket)
if err != nil {
return err
}
enabled := b.Get([]byte(name)) != nil
if enabled {
err = b.Delete([]byte(name))
if err != nil {
return err
for _, rawTask := range rawTasks {
enabled := b.Get([]byte(rawTask.Name)) != nil
if enabled {
err = b.Delete([]byte(rawTask.Name))
if err != nil {
return err
}
kapacitor.NumEnabledTasksVar.Add(-1)
}
kapacitor.NumEnabledTasksVar.Add(-1)
}
return nil
})

if err != nil {
return err
}
return ts.TaskMaster.StopTask(name)

for _, rawTask := range rawTasks {
err = ts.TaskMaster.StopTask(rawTask.Name)
if err != nil {
return err
}
}

return nil
}

type TaskSummaryInfo struct {
Expand All @@ -714,6 +794,44 @@ func (ts *Service) IsEnabled(name string) (e bool) {
return
}

// Returns all taskInfo of task name that matches the predicate
func (ts *Service) FindTasks(predicate func(string) (bool, error)) ([]*rawTask, error) {
rawTasks := make([]*rawTask, 0)

err := ts.db.View(func(tx *bolt.Tx) error {
tb := tx.Bucket([]byte(tasksBucket))
if tb == nil {
return nil
}

return tb.ForEach(func(k, v []byte) error {
taskName := string(k)
isMatched, err := predicate(taskName)
if err != nil {
return err
}
if !isMatched {
return nil
}

// Grab task info
t, err := ts.LoadRaw(taskName)
if err != nil {
return fmt.Errorf("found invalid task in db. name: %s, err: %s", string(k), err)
}

rawTasks = append(rawTasks, t)
return nil
})

})
if err != nil {
return nil, err
}

return rawTasks, nil
}

func (ts *Service) GetTaskSummaryInfo(tasks []string) ([]TaskSummaryInfo, error) {
taskInfos := make([]TaskSummaryInfo, 0)

Expand Down

0 comments on commit bc3aa6d

Please sign in to comment.