Skip to content

Commit

Permalink
fix issue with list tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 22, 2016
1 parent 7fdc137 commit 6bcaa35
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 55 deletions.
10 changes: 2 additions & 8 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,14 +812,8 @@ func doList(args []string) error {
defer r.Body.Close()
// Decode valid response
type resp struct {
Error string `json:"Error"`
Tasks []struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
} `json:"Tasks"`
Error string `json:"Error"`
Tasks []task_store.TaskSummaryInfo `json:"Tasks"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
Expand Down
23 changes: 23 additions & 0 deletions cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,29 @@ func (s *Server) GetTask(name string) (ti task_store.TaskInfo, err error) {
return
}

func (s *Server) ListTasks() ([]task_store.TaskSummaryInfo, error) {
r, err := http.Get(s.URL() + "/tasks")
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK)
}
// Decode valid response
type resp struct {
Error string `json:"Error"`
Tasks []task_store.TaskSummaryInfo `json:"Tasks"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
d.Decode(&rp)
if rp.Error != "" {
return nil, errors.New(rp.Error)
}
return rp.Tasks, nil
}

// MustReadAll reads r. Panic on error.
func MustReadAll(r io.Reader) []byte {
b, err := ioutil.ReadAll(r)
Expand Down
64 changes: 64 additions & 0 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,70 @@ func TestServer_DeleteTask(t *testing.T) {
}
}

func TestServer_ListTasks(t *testing.T) {
s := OpenDefaultServer()
defer s.Close()
count := 10

ttype := "stream"
tick := "stream.from().measurement('test')"
dbrps := []kapacitor.DBRP{
{
Database: "mydb",
RetentionPolicy: "myrp",
},
{
Database: "otherdb",
RetentionPolicy: "default",
},
}
for i := 0; i < count; i++ {
name := fmt.Sprintf("testTaskName%d", i)
r, err := s.DefineTask(name, ttype, tick, dbrps)
if err != nil {
t.Fatal(err)
}
if r != "" {
t.Fatal("unexpected result", r)
}

if i%2 == 0 {
r, err = s.EnableTask(name)
if err != nil {
t.Fatal(err)
}
if r != "" {
t.Fatal("unexpected result", r)
}
}
}
tasks, err := s.ListTasks()
if err != nil {
t.Fatal(err)
}
if exp, got := count, len(tasks); exp != got {
t.Fatalf("unexpected number of tasks: exp:%d got:%d", exp, got)
}
for i, task := range tasks {
if exp, got := fmt.Sprintf("testTaskName%d", i), task.Name; exp != got {
t.Errorf("unexpected task.Name i:%d exp:%s got:%s", i, exp, got)
}
if exp, got := kapacitor.StreamTask, task.Type; exp != got {
t.Errorf("unexpected task.Type i:%d exp:%v got:%v", i, exp, got)
}
if !reflect.DeepEqual(task.DBRPs, dbrps) {
t.Fatalf("unexpected dbrps i:%d exp:%s got:%s", i, dbrps, task.DBRPs)
}
if exp, got := i%2 == 0, task.Enabled; exp != got {
t.Errorf("unexpected task.Enabled i:%d exp:%v got:%v", i, exp, got)
}
if exp, got := i%2 == 0, task.Executing; exp != got {
t.Errorf("unexpected task.Executing i:%d exp:%v got:%v", i, exp, got)
}
}

}

func TestServer_StreamTask(t *testing.T) {
s := OpenDefaultServer()
defer s.Close()
Expand Down
7 changes: 6 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Node interface {

addParentEdge(*Edge)

init()

// start the node and its children
start(snapshot []byte)
stop()
Expand Down Expand Up @@ -88,7 +90,7 @@ func (n *node) abortParentEdges() {
}
}

func (n *node) start(snapshot []byte) {
func (n *node) init() {
tags := map[string]string{
"task": n.et.Task.Name,
"node": n.Name(),
Expand All @@ -100,6 +102,9 @@ func (n *node) start(snapshot []byte) {
n.statMap.Set(statAverageExecTime, avgExecVar)
n.timer = n.et.tm.TimingService.NewTimer(avgExecVar)
n.errCh = make(chan error, 1)
}

func (n *node) start(snapshot []byte) {
go func() {
var err error
defer func() {
Expand Down
48 changes: 26 additions & 22 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ func (ts *Service) LoadSnapshot(name string) (*kapacitor.TaskSnapshot, error) {
}

type TaskInfo struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
TICKscript string
Dot string
Enabled bool
Executing bool
Error string
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
TICKscript string
Dot string
Enabled bool
Executing bool
Error string
ExecutionStats kapacitor.ExecutionStats
}

func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -301,10 +302,12 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) {
executing := ts.TaskMaster.IsExecuting(name)
errMsg := raw.Error
dot := ""
stats := kapacitor.ExecutionStats{}
task, err := ts.Load(name)
if err == nil {
if executing {
dot = ts.TaskMaster.ExecutingDot(name, labels)
stats, _ = ts.TaskMaster.ExecutionStats(name)
} else {
dot = string(task.Dot())
}
Expand All @@ -313,14 +316,15 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) {
}

info := TaskInfo{
Name: name,
Type: raw.Type,
DBRPs: raw.DBRPs,
TICKscript: raw.TICKscript,
Dot: dot,
Enabled: ts.IsEnabled(name),
Executing: executing,
Error: errMsg,
Name: name,
Type: raw.Type,
DBRPs: raw.DBRPs,
TICKscript: raw.TICKscript,
Dot: dot,
Enabled: ts.IsEnabled(name),
Executing: executing,
Error: errMsg,
ExecutionStats: stats,
}

w.Write(httpd.MarshalJSON(info, true))
Expand All @@ -333,14 +337,14 @@ func (ts *Service) handleTasks(w http.ResponseWriter, r *http.Request) {
tasks = strings.Split(tasksStr, ",")
}

infos, err := ts.GetTaskInfo(tasks)
infos, err := ts.GetTaskSummaryInfo(tasks)
if err != nil {
httpd.HttpError(w, err.Error(), true, http.StatusNotFound)
return
}

type response struct {
Tasks []taskInfo `json:"Tasks"`
Tasks []TaskSummaryInfo `json:"Tasks"`
}

w.Write(httpd.MarshalJSON(response{infos}, true))
Expand Down Expand Up @@ -684,7 +688,7 @@ func (ts *Service) Disable(name string) error {
return ts.TaskMaster.StopTask(name)
}

type taskInfo struct {
type TaskSummaryInfo struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Expand All @@ -702,8 +706,8 @@ func (ts *Service) IsEnabled(name string) (e bool) {
return
}

func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) {
taskInfos := make([]taskInfo, 0)
func (ts *Service) GetTaskSummaryInfo(tasks []string) ([]TaskSummaryInfo, error) {
taskInfos := make([]TaskSummaryInfo, 0)

err := ts.db.View(func(tx *bolt.Tx) error {
tb := tx.Bucket([]byte(tasksBucket))
Expand All @@ -720,7 +724,7 @@ func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) {

enabled := eb != nil && eb.Get(k) != nil

info := taskInfo{
info := TaskSummaryInfo{
Name: t.Name,
Type: t.Type,
DBRPs: t.DBRPs,
Expand Down
52 changes: 28 additions & 24 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,57 +407,61 @@ func (et *ExecutingTask) calcThroughput() {
}

// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error) {
func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (n Node, err error) {
switch t := p.(type) {
case *pipeline.StreamNode:
return newStreamNode(et, t, l)
n, err = newStreamNode(et, t, l)
case *pipeline.SourceStreamNode:
return newSourceStreamNode(et, t, l)
n, err = newSourceStreamNode(et, t, l)
case *pipeline.SourceBatchNode:
return newSourceBatchNode(et, t, l)
n, err = newSourceBatchNode(et, t, l)
case *pipeline.BatchNode:
return newBatchNode(et, t, l)
n, err = newBatchNode(et, t, l)
case *pipeline.WindowNode:
return newWindowNode(et, t, l)
n, err = newWindowNode(et, t, l)
case *pipeline.HTTPOutNode:
return newHTTPOutNode(et, t, l)
n, err = newHTTPOutNode(et, t, l)
case *pipeline.InfluxDBOutNode:
return newInfluxDBOutNode(et, t, l)
n, err = newInfluxDBOutNode(et, t, l)
case *pipeline.MapNode:
return newMapNode(et, t, l)
n, err = newMapNode(et, t, l)
case *pipeline.ReduceNode:
return newReduceNode(et, t, l)
n, err = newReduceNode(et, t, l)
case *pipeline.AlertNode:
return newAlertNode(et, t, l)
n, err = newAlertNode(et, t, l)
case *pipeline.GroupByNode:
return newGroupByNode(et, t, l)
n, err = newGroupByNode(et, t, l)
case *pipeline.UnionNode:
return newUnionNode(et, t, l)
n, err = newUnionNode(et, t, l)
case *pipeline.JoinNode:
return newJoinNode(et, t, l)
n, err = newJoinNode(et, t, l)
case *pipeline.EvalNode:
return newEvalNode(et, t, l)
n, err = newEvalNode(et, t, l)
case *pipeline.WhereNode:
return newWhereNode(et, t, l)
n, err = newWhereNode(et, t, l)
case *pipeline.SampleNode:
return newSampleNode(et, t, l)
n, err = newSampleNode(et, t, l)
case *pipeline.DerivativeNode:
return newDerivativeNode(et, t, l)
n, err = newDerivativeNode(et, t, l)
case *pipeline.UDFNode:
return newUDFNode(et, t, l)
n, err = newUDFNode(et, t, l)
case *pipeline.StatsNode:
return newStatsNode(et, t, l)
n, err = newStatsNode(et, t, l)
case *pipeline.ShiftNode:
return newShiftNode(et, t, l)
n, err = newShiftNode(et, t, l)
case *pipeline.NoOpNode:
return newNoOpNode(et, t, l)
n, err = newNoOpNode(et, t, l)
case *pipeline.InfluxQLNode:
return newInfluxQLNode(et, t, l)
n, err = newInfluxQLNode(et, t, l)
case *pipeline.LogNode:
return newLogNode(et, t, l)
n, err = newLogNode(et, t, l)
default:
return nil, fmt.Errorf("unknown pipeline node type %T", p)
}
if err == nil && n != nil {
n.init()
}
return n, err
}

type TaskSnapshot struct {
Expand Down

0 comments on commit 6bcaa35

Please sign in to comment.