Skip to content

Commit

Permalink
add CRUD for recordings
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 5, 2015
1 parent caeec69 commit 3d62312
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 69 deletions.
2 changes: 1 addition & 1 deletion alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (a *AlertNode) check(p *models.Point) (bool, error) {
func (a *AlertNode) handlePost(pts []*models.Point) {
b, err := json.Marshal(pts)
if err != nil {
a.l.Println("failed to marshal points json")
a.l.Println("E@failed to marshal points json")
return
}
buf := bytes.NewBuffer(b)
Expand Down
12 changes: 7 additions & 5 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (b *BatchNode) Query(batch BatchCollector) {
b.query.Start(now.Add(-1 * b.b.Period))
b.query.Stop(now)

b.l.Println("D@starting next batch query:", b.query.String())

// Connect
con, err := client.NewClient(b.conf)
if err != nil {
b.l.Println(err)
b.l.Println("E@" + err.Error())
break
}
q := client.Query{
Expand All @@ -67,19 +69,19 @@ func (b *BatchNode) Query(batch BatchCollector) {
// Execute query
resp, err := con.Query(q)
if err != nil {
b.l.Println(err)
b.l.Println("E@" + err.Error())
return
}

if resp.Err != nil {
b.l.Println(resp.Err)
b.l.Println("E@" + resp.Err.Error())
return
}

// Collect batches
for _, res := range resp.Results {
if res.Err != nil {
b.l.Println(res.Err)
b.l.Println("E@" + res.Err.Error())
return
}
for _, series := range res.Series {
Expand All @@ -95,7 +97,7 @@ func (b *BatchNode) Query(batch BatchCollector) {
if c == "time" {
t, err = time.Parse(time.RFC3339, v[i].(string))
if err != nil {
b.l.Println(err)
b.l.Println("E@" + err.Error())
return
}
} else {
Expand Down
150 changes: 104 additions & 46 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Commands:
enable enable and start running a task with live data.
disable stop running a task.
push publish a task definition to another Kapacitor instance.
delete delete a task.
list list information about running tasks.
delete delete a task or a recording.
list list information about tasks or recordings.
help get help for a command.
version displays the Kapacitor version info.
`
Expand Down Expand Up @@ -180,9 +180,11 @@ func recordUsage() {
Record the result of a InfluxDB query or a snapshot of the live data stream.
Prints the replay ID on exit.
Prints the recording ID on exit.
Replays have types like tasks. If recording a raw query you must specify the desired type.
Recordings have types like tasks. If recording a raw query you must specify the desired type.
See 'kapacitor help replay' for how to replay a recording.
Examples:
Expand All @@ -197,7 +199,7 @@ Examples:
$ kapacitor record query -addr 'http://localhost:8086' -query "select value from cpu_idle where time > now() - 1h and time < now()" -type streamer
This records the result of the query and stores it as a stream replay. Use -type batcher to store as batch replay.
This records the result of the query and stores it as a stream recording. Use -type batcher to store as batch recording.
Options:
`
Expand Down Expand Up @@ -231,16 +233,16 @@ func doRecord(args []string) error {

// Decode valid response
type resp struct {
ReplayID string `json:"ReplayID"`
Error string `json:"Error"`
RecordingID string `json:"RecordingID"`
Error string `json:"Error"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
d.Decode(&rp)
if rp.Error != "" {
return errors.New(rp.Error)
}
l.Println(rp.ReplayID)
fmt.Println(rp.RecordingID)
return nil
}

Expand Down Expand Up @@ -303,7 +305,7 @@ func doDefine(args []string) error {
var (
replayFlags = flag.NewFlagSet("replay", flag.ExitOnError)
rtname = replayFlags.String("name", "", "the task name")
rid = replayFlags.String("id", "", "the replay id")
rid = replayFlags.String("id", "", "the recording ID")
rfast = replayFlags.Bool("fast", false, "whether to replay the data as fast as possible. If false, replay the data in real time")
)

Expand Down Expand Up @@ -429,71 +431,126 @@ func doDisable(args []string) error {
// List

func listUsage() {
var u = `Usage: kapacitor list [task...]
var u = `Usage: kapacitor list (tasks|recordings) [task name|recording ID]...
List tasks and their current state.
List tasks or recordings and their current state.
If no tasks are given then all tasks are listed.
If a set of task names is provided only those tasks will be listed.
If no tasks are given then all tasks are listed. Same for recordings.
If a set of task names or recordings IDs is provided only those entries will be listed.
`
fmt.Fprintln(os.Stderr, u)
}

func doList(args []string) error {

tasks := strings.Join(args, ",")
v := url.Values{}
v.Add("tasks", tasks)
r, err := http.Get("http://localhost:9092/tasks?" + v.Encode())
if err != nil {
return err
}
defer r.Body.Close()
// Decode valid response
type resp struct {
Error string `json:"Error"`
Tasks []struct {
Name string
Type kapacitor.TaskType
Enabled bool
} `json:"Tasks"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
d.Decode(&rp)
if rp.Error != "" {
return errors.New(rp.Error)
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Must specify 'tasks' or 'recordings'")
listUsage()
os.Exit(2)
}

outFmt := "%-30s%-10v%-10v\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled")
for _, t := range rp.Tasks {
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled)
switch args[0] {
case "tasks":
tasks := strings.Join(args[1:], ",")
v := url.Values{}
v.Add("tasks", tasks)
r, err := http.Get("http://localhost:9092/tasks?" + v.Encode())
if err != nil {
return err
}
defer r.Body.Close()
// Decode valid response
type resp struct {
Error string `json:"Error"`
Tasks []struct {
Name string
Type kapacitor.TaskType
Enabled bool
} `json:"Tasks"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
d.Decode(&rp)
if rp.Error != "" {
return errors.New(rp.Error)
}

outFmt := "%-30s%-10v%-10v\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled")
for _, t := range rp.Tasks {
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled)
}
case "recordings":

rids := strings.Join(args[1:], ",")
v := url.Values{}
v.Add("rids", rids)
r, err := http.Get("http://localhost:9092/recordings?" + v.Encode())
if err != nil {
return err
}
defer r.Body.Close()
// Decode valid response
type resp struct {
Error string `json:"Error"`
Recordings []struct {
ID string
Type kapacitor.TaskType
Size int64
} `json:"Recordings"`
}
d := json.NewDecoder(r.Body)
rp := resp{}
d.Decode(&rp)
if rp.Error != "" {
return errors.New(rp.Error)
}

outFmt := "%-40s%-10v%15.2f\n"
fmt.Fprintf(os.Stdout, "%-40s%-10s%15s\n", "ID", "Type", "Size (MB)")
for _, r := range rp.Recordings {
fmt.Fprintf(os.Stdout, outFmt, r.ID, r.Type, float64(r.Size)/1024.0/1024.0)
}
}
return nil

}

// Delete
func deleteUsage() {
var u = `Usage: kapacitor delete [task name...]
var u = `Usage: kapacitor delete (task|recording) [task name|recording ID]...
Delete a task. If enabled it will be disabled and then deleted.
Delete a task or recording.
If a task is enabled it will be disabled and then deleted.
`
fmt.Fprintln(os.Stderr, u)
}

func doDelete(args []string) error {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "Must pass at least one task name")
if len(args) < 2 {
fmt.Fprintln(os.Stderr, "Must pass at least one task name or recording ID")
enableUsage()
os.Exit(2)
}

for _, name := range args {
var baseURL string
var paramName string
switch args[0] {
case "task":
baseURL = "http://localhost:9092/task"
paramName = "name"
case "recording":
baseURL = "http://localhost:9092/recording"
paramName = "rid"
}

l.Println(args)

for _, arg := range args[1:] {
v := url.Values{}
v.Add("name", name)
req, err := http.NewRequest("DELETE", "http://localhost:9092/task?"+v.Encode(), nil)
v.Add(paramName, arg)
req, err := http.NewRequest("DELETE", baseURL+v.Encode(), nil)
if err != nil {
return err
}
Expand All @@ -514,6 +571,7 @@ func doDelete(args []string) error {
return errors.New(rp.Error)
}
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"

"github.com/influxdb/kapacitor/log_writer"
"github.com/influxdb/kapacitor/pipeline"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func (n *node) closeParentEdges() {
}

func (n *node) start() {
n.l = log.New(os.Stderr, fmt.Sprintf("[%s] ", n.Name()), log.LstdFlags)
n.l = log_writer.New(os.Stderr, fmt.Sprintf("[%s] ", n.Name()), log.LstdFlags)
n.errCh = make(chan error, 1)
go func() {
var err error
Expand Down
18 changes: 18 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap
"write", // Data-ingest route.
"POST", "/write", true, true, h.serveWrite,
},
Route{
"routes", // Display current API routes
"GET", "/:routes", true, true, h.serveRoutes,
},
Route{
"404", // Catch all 404
"GET", "/", true, true, h.serve404,
Expand Down Expand Up @@ -187,6 +191,20 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

// serveRoutes returns a list of all routs and their methods
func (h *Handler) serveRoutes(w http.ResponseWriter, r *http.Request) {
routes := make(map[string][]string)

for method, mux := range h.methodMux {
patterns := mux.Patterns()
for _, p := range patterns {
routes[p] = append(routes[p], method)
}
}

w.Write(MarshalJSON(routes, true))
}

// serve404 returns an a formated 404 error
func (h *Handler) serve404(w http.ResponseWriter, r *http.Request) {
HttpError(w, "Not Found", true, http.StatusNotFound)
Expand Down
12 changes: 12 additions & 0 deletions services/httpd/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,15 @@ func (mux *ServeMux) Deregister(pattern string) {
defer mux.mu.Unlock()
delete(mux.m, pattern)
}

func (mux *ServeMux) Patterns() []string {
mux.mu.Lock()
defer mux.mu.Unlock()
patterns := make([]string, len(mux.m))
i := 0
for p, _ := range mux.m {
patterns[i] = p
i++
}
return patterns
}
Loading

0 comments on commit 3d62312

Please sign in to comment.