Skip to content

Commit

Permalink
add live replays (influxdata#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook committed May 18, 2016
1 parent 83956df commit ca64974
Show file tree
Hide file tree
Showing 12 changed files with 1,672 additions and 366 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,22 @@

### Release Notes

With this release you can now replay data directly against a task from InfluxDB without having to first create a recording.
Replay the queries defined in the batch task `cpu_alert` for the past 10 hours.
```sh
kapacitor replay-live batch -task cpu_alert -past 10h
```

Or for a stream task with use a query directly:

```sh
kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM telegraf."default".cpu WHERE time > now() - 10h'
```


### Features

- [#283](https://github.com/influxdata/kapacitor/issues/283): Add live replays.
- [#82](https://github.com/influxdata/kapacitor/issues/82): Multiple services for PagerDuty alert
- [#558](https://github.com/influxdata/kapacitor/pull/558): Preserve fields as well as tags on selector InfluxQL functions.

Expand Down
100 changes: 98 additions & 2 deletions client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ POST /kapacitor/v1/recordings/query
Create a recording using the `query` method specifying a `batch` type.

```
POST /kapacitor/v1/recording/query
POST /kapacitor/v1/recordings/query
{
"query" : "SELECT mean(usage_idle) FROM cpu WHERE time > now() - 1h GROUP BY time(10m)",
"type" : "batch"
Expand All @@ -554,7 +554,7 @@ POST /kapacitor/v1/recording/query
Create a recording with a custom ID.

```
POST /kapacitor/v1/recording/query
POST /kapacitor/v1/recordings/query
{
"id" : "MY_RECORDING_ID",
"query" : "SELECT mean(usage_idle) FROM cpu WHERE time > now() - 1h GROUP BY time(10m)",
Expand Down Expand Up @@ -802,6 +802,102 @@ The request returns once the replay is started and provides a replay ID and link
| ---- | ------- |
| 201 | Success, replay has started. |

### Replay data without Recording

It is also possible to replay data directly without recording it first.
This is done by issuing a request similar to either a `batch` or `query` recording
but instead of storing the data it is immediately replayed against a task.
Using a `stream` recording for immediately replaying against a task is equivalent to enabling the task
and so is not supported.

| Method | Description |
| ------ | ----------- |
| batch | Replay the results of the queries in a batch task. |
| query | Replay the results of an explicit query. |


##### Batch

| Parameter | Default | Purpose |
| --------- | ------- | ------- |
| id | random | Unique identifier for the replay. If empty a random one will be chosen. |
| task | | ID of a task, replays the results of the queries defined in the task against the task. |
| start | | Earliest date for which data will be replayed. RFC3339Nano formatted. |
| stop | now | Latest date for which data will be replayed. If not specified uses the current time. RFC3339Nano formatted data. |
| cluster | | Name of a configured InfluxDB cluster. If empty uses the default cluster. |
| recording-time | false | If true, use the times in the recording, otherwise adjust times relative to the current time. |
| clock | fast | One of `fast` or `real`. If `real` wait for real time to pass corresponding with the time in the recordings. If `fast` replay data without delay. For example, if clock is `real` then a stream recording of duration 5m will take 5m to replay. |

##### Query

| Parameter | Default | Purpose |
| --------- | ------- | ------- |
| id | random | Unique identifier for the replay. If empty a random one will be chosen. |
| task | | ID of a task, replays the results of the queries against the task. |
| query | | Query to execute. |
| cluster | | Name of a configured InfluxDB cluster. If empty uses the default cluster. |
| recording-time | false | If true, use the times in the recording, otherwise adjust times relative to the current time. |
| clock | fast | One of `fast` or `real`. If `real` wait for real time to pass corresponding with the time in the recordings. If `fast` replay data without delay. For example, if clock is `real` then a stream recording of duration 5m will take 5m to replay. |

#### Example

Perform a replay using the `batch` method specifying a start time.

```
POST /kapacitor/v1/replays/batch
{
"task" : "TASK_ID",
"start" : "2006-01-02T15:04:05Z07:00"
}
```

Replay the results of the query against the task.

```
POST /kapacitor/v1/replays/query
{
"task" : "TASK_ID",
"query" : "SELECT mean(usage_idle) FROM cpu WHERE time > now() - 1h GROUP BY time(10m)",
}
```

Create a replay with a custom ID.

```
POST /kapacitor/v1/replays/query
{
"id" : "MY_REPLAY_ID",
"task" : "TASK_ID",
"query" : "SELECT mean(usage_idle) FROM cpu WHERE time > now() - 1h GROUP BY time(10m)",
}
```

#### Response

All replays are assigned an ID which is returned in this format with a link.

```
{
"link" : {"rel": "self", "href": "/kapacitor/v1/replays/e24db07d-1646-4bb3-a445-828f5049bea0"},
"id" : "e24db07d-1646-4bb3-a445-828f5049bea0",
"task" : "TASK_ID",
"recording" : "",
"clock" : "fast",
"recording-time" : false,
"status" : "running",
"progress" : 0.57,
"error" : ""
}
```

>NOTE: For a replay created in this manner the `recording` ID will be empty since no recording was used or created.

| Code | Meaning |
| ---- | ------- |
| 201 | Success, the replay has started. |


### Waiting for a Replay

Like recordings you make a GET request to the `/kapacitor/v1/replays/REPLAY_ID` endpoint to get the status of the replay.
Expand Down
73 changes: 73 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const recordStreamPath = basePath + "/recordings/stream"
const recordBatchPath = basePath + "/recordings/batch"
const recordQueryPath = basePath + "/recordings/query"
const replaysPath = basePath + "/replays"
const replayBatchPath = basePath + "/replays/batch"
const replayQueryPath = basePath + "/replays/query"

// HTTP configuration for connecting to Kapacitor
type Config struct {
Expand Down Expand Up @@ -879,6 +881,77 @@ func (c *Client) CreateReplay(opt CreateReplayOptions) (Replay, error) {
return r, nil
}

type ReplayBatchOptions struct {
ID string `json:"id,omitempty"`
Task string `json:"task"`
Start time.Time `json:"start"`
Stop time.Time `json:"stop"`
Cluster string `json:"cluster,omitempty"`
RecordingTime bool `json:"recording-time"`
Clock Clock `json:"clock"`
}

// Replay a query against a task.
func (c *Client) ReplayBatch(opt ReplayBatchOptions) (Replay, error) {
r := Replay{}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(opt)
if err != nil {
return r, err
}

u := *c.url
u.Path = replayBatchPath

req, err := http.NewRequest("POST", u.String(), &buf)
if err != nil {
return r, err
}

_, err = c.do(req, &r, http.StatusCreated)
if err != nil {
return r, err
}
return r, nil
}

type ReplayQueryOptions struct {
ID string `json:"id,omitempty"`
Task string `json:"task"`
Query string `json:"query"`
Cluster string `json:"cluster,omitempty"`
RecordingTime bool `json:"recording-time"`
Clock Clock `json:"clock"`
}

// Replay a query against a task.
func (c *Client) ReplayQuery(opt ReplayQueryOptions) (Replay, error) {
r := Replay{}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(opt)
if err != nil {
return r, err
}

u := *c.url
u.Path = replayQueryPath

req, err := http.NewRequest("POST", u.String(), &buf)
if err != nil {
return r, err
}

_, err = c.do(req, &r, http.StatusCreated)
if err != nil {
return r, err
}
return r, nil
}

// Return the replay information
func (c *Client) Replay(link Link) (Replay, error) {
r := Replay{}
Expand Down
96 changes: 94 additions & 2 deletions client/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ func Test_ReportsErrors(t *testing.T) {
return err
},
},
{
name: "ReplayBatch",
fnc: func(c *client.Client) error {
_, err := c.ReplayBatch(client.ReplayBatchOptions{})
return err
},
},
{
name: "ReplayQuery",
fnc: func(c *client.Client) error {
_, err := c.ReplayQuery(client.ReplayQueryOptions{})
return err
},
},
{
name: "DeleteReplay",
fnc: func(c *client.Client) error {
Expand Down Expand Up @@ -839,7 +853,7 @@ func Test_RecordQuery(t *testing.T) {
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &opts)
if r.URL.Path == "/kapacitor/v1/recordings/query" && r.Method == "POST" &&
opts.Query == "SELECT * FROM allthetings" &&
opts.Query == "SELECT * FROM allthethings" &&
opts.Type == client.StreamTask &&
opts.Cluster == "mycluster" {
w.WriteHeader(http.StatusCreated)
Expand All @@ -855,7 +869,7 @@ func Test_RecordQuery(t *testing.T) {
defer s.Close()

r, err := c.RecordQuery(client.RecordQueryOptions{
Query: "SELECT * FROM allthetings",
Query: "SELECT * FROM allthethings",
Cluster: "mycluster",
Type: client.StreamTask,
})
Expand Down Expand Up @@ -1202,6 +1216,84 @@ func Test_CreateReplay(t *testing.T) {
}
}

func Test_ReplayBatch(t *testing.T) {
stop := time.Now().UTC()
start := stop.Add(-24 * time.Hour)
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var opts client.ReplayBatchOptions
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &opts)
if r.URL.Path == "/kapacitor/v1/replays/batch" && r.Method == "POST" &&
opts.Task == "taskname" &&
opts.Start == start &&
opts.Stop == stop &&
opts.RecordingTime == true &&
opts.Clock == client.Real {
w.WriteHeader(http.StatusCreated)
fmt.Fprintf(w, `{"link":{"rel":"self","href":"/kapacitor/v1/replays/replayid"}}`)
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "request: %v", r)
}
}))
if err != nil {
t.Fatal(err)
}
defer s.Close()

replay, err := c.ReplayBatch(client.ReplayBatchOptions{
Task: "taskname",
Start: start,
Stop: stop,
Cluster: "mycluster",
Clock: client.Real,
RecordingTime: true,
})
if err != nil {
t.Fatal(err)
}
if exp, got := "/kapacitor/v1/replays/replayid", string(replay.Link.Href); exp != got {
t.Errorf("unexpected replay.Link.Href got %s exp %s", got, exp)
}
}

func Test_ReplayQuery(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var opts client.ReplayQueryOptions
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &opts)
if r.URL.Path == "/kapacitor/v1/replays/query" && r.Method == "POST" &&
opts.Task == "taskname" &&
opts.Query == "SELECT * FROM allthethings" &&
opts.Cluster == "mycluster" &&
opts.RecordingTime == false &&
opts.Clock == client.Fast {
w.WriteHeader(http.StatusCreated)
fmt.Fprintf(w, `{"link":{"rel":"self","href":"/kapacitor/v1/replays/replayid"}}`)
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "request: %v", r)
}
}))
if err != nil {
t.Fatal(err)
}
defer s.Close()

replay, err := c.ReplayQuery(client.ReplayQueryOptions{
Task: "taskname",
Query: "SELECT * FROM allthethings",
Cluster: "mycluster",
Clock: client.Fast,
})
if err != nil {
t.Fatal(err)
}
if exp, got := "/kapacitor/v1/replays/replayid", string(replay.Link.Href); exp != got {
t.Errorf("unexpected replay.Link.Href got %s exp %s", got, exp)
}
}

func Test_DeleteReplay(t *testing.T) {
s, c, err := newClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/kapacitor/v1/replays/replayid" && r.Method == "DELETE" {
Expand Down
Loading

0 comments on commit ca64974

Please sign in to comment.