Skip to content

Commit

Permalink
fix inconsistency with default rp on /write (influxdata#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook authored Oct 26, 2016
1 parent da6e60b commit 4f2b801
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ See the API docs for more details.

- [#984](https://github.com/influxdata/kapacitor/issues/984): Fix bug where keeping a list of fields that where not referenced in the eval expressions would cause an error.
- [#955](https://github.com/influxdata/kapacitor/issues/955): Fix the number of subscriptions statistic.
- [#999](https://github.com/influxdata/kapacitor/issues/999): Fix inconsistency with InfluxDB by adding config option to set a default retention policy.

## v1.0.2 [2016-10-06]

Expand Down Expand Up @@ -488,7 +489,7 @@ If you have existing tasks which do not match this pattern they should continue

### Bugfixes

- [#545](https://github.com/influxdata/kapacitor/issues/545): Fixes inconsistancy with API docs for creating a task.
- [#545](https://github.com/influxdata/kapacitor/issues/545): Fixes inconsistency with API docs for creating a task.
- [#544](https://github.com/influxdata/kapacitor/issues/544): Fixes issues with existings tasks and invalid names.
- [#543](https://github.com/influxdata/kapacitor/issues/543): Fixes default values not being set correctly in API calls.

Expand Down
5 changes: 5 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ data_dir = "/var/lib/kapacitor"
# This option is intended as a safe guard and should not be needed in practice.
skip-config-overrides = false

# Default retention-policy, if a write is made to Kapacitor and
# it does not have a retention policy associated with it,
# then the retention policy will be set to this value
default-retention-policy = ""

[http]
# HTTP API Server for Kapacitor
# This server is always on,
Expand Down
7 changes: 4 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ type Config struct {
UDF udf.Config `toml:"udf"`
Deadman deadman.Config `toml:"deadman"`

Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
SkipConfigOverrides bool `toml:"skip-config-overrides"`
Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
SkipConfigOverrides bool `toml:"skip-config-overrides"`
DefaultRetentionPolicy string `toml:"default-retention-policy"`
}

// NewConfig returns an instance of Config with reasonable defaults.
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
// Start Task Master
s.TaskMasterLookup = kapacitor.NewTaskMasterLookup()
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, logService)
s.TaskMaster.DefaultRetentionPolicy = c.DefaultRetentionPolicy
s.TaskMasterLookup.Set(s.TaskMaster)
if err := s.TaskMaster.Open(); err != nil {
return nil, err
Expand Down
79 changes: 79 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,85 @@ test value=1 0000000011
}
}

func TestServer_StreamTask_NoRP(t *testing.T) {
conf := NewConfig()
conf.DefaultRetentionPolicy = "myrp"
s := OpenServer(conf)
defer s.Close()
cli := Client(s)

id := "testStreamTask"
ttype := client.StreamTask
dbrps := []client.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}}
tick := `stream
|from()
.measurement('test')
|window()
.period(10s)
.every(10s)
|count('value')
|httpOut('count')
`

task, err := cli.CreateTask(client.CreateTaskOptions{
ID: id,
Type: ttype,
DBRPs: dbrps,
TICKscript: tick,
Status: client.Disabled,
})
if err != nil {
t.Fatal(err)
}

_, err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{
Status: client.Enabled,
})
if err != nil {
t.Fatal(err)
}

endpoint := fmt.Sprintf("%s/tasks/%s/count", s.URL(), id)

// Request data before any writes and expect null responses
nullResponse := `{}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}

points := `test value=1 0000000000
test value=1 0000000001
test value=1 0000000001
test value=1 0000000002
test value=1 0000000002
test value=1 0000000003
test value=1 0000000003
test value=1 0000000004
test value=1 0000000005
test value=1 0000000005
test value=1 0000000005
test value=1 0000000006
test value=1 0000000007
test value=1 0000000008
test value=1 0000000009
test value=1 0000000010
test value=1 0000000011
`
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "", points, v)

exp := `{"series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}]}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}
}

func TestServer_StreamTemplateTask(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()
Expand Down
13 changes: 10 additions & 3 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type TaskMaster struct {
}
LogService LogService

DefaultRetentionPolicy string

// Incoming streams
writePointsIn StreamCollector

Expand Down Expand Up @@ -174,22 +176,24 @@ func NewTaskMaster(id string, l LogService) *TaskMaster {
// Returns a new TaskMaster instance with the same services as the current one.
func (tm *TaskMaster) New(id string) *TaskMaster {
n := NewTaskMaster(id, tm.LogService)
n.DefaultRetentionPolicy = tm.DefaultRetentionPolicy
n.HTTPDService = tm.HTTPDService
n.UDFService = tm.UDFService
n.DeadmanService = tm.DeadmanService
n.TaskStore = tm.TaskStore
n.DeadmanService = tm.DeadmanService
n.UDFService = tm.UDFService
n.InfluxDBService = tm.InfluxDBService
n.SMTPService = tm.SMTPService
n.OpsGenieService = tm.OpsGenieService
n.VictorOpsService = tm.VictorOpsService
n.PagerDutyService = tm.PagerDutyService
n.SlackService = tm.SlackService
n.TelegramService = tm.TelegramService
n.HipChatService = tm.HipChatService
n.AlertaService = tm.AlertaService
n.SensuService = tm.SensuService
n.TalkService = tm.TalkService
n.TimingService = tm.TimingService
n.TelegramService = tm.TelegramService
n.K8sService = tm.K8sService
return n
}

Expand Down Expand Up @@ -550,6 +554,9 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL
if tm.closed {
return ErrTaskMasterClosed
}
if retentionPolicy == "" {
retentionPolicy = tm.DefaultRetentionPolicy
}
for _, mp := range points {
p := models.Point{
Database: database,
Expand Down

0 comments on commit 4f2b801

Please sign in to comment.