diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 2b4f5a1ba..28503aaa0 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -24,7 +24,7 @@ import ( // These variables are populated via the Go linker. var ( - version string = "v0.1" + version string commit string branch string ) @@ -228,15 +228,15 @@ Examples: This records the live data stream for 1 minute using the databases and retention policies from the named task. - + $ kapacitor record batch -name cpu_idle -start 2015-09-01T00:00:00Z -stop 2015-09-02T00:00:00Z - + This records the result of the query defined in task 'cpu_idle' and runs the query as many times as many times as defined by the schedule until the queries reaches the stop time. starting at time 'start' and incrementing by the schedule defined in the task. $ kapacitor record batch -name cpu_idle -past 10h - + This records the result of the query defined in task 'cpu_idle' and runs the query as many times as defined by the schedule until the queries reaches the present time. The starting time for the queries is 'now - 10h' and increments by the schedule defined in the task. @@ -379,7 +379,7 @@ For example: $ kapacitor define -name my_task -tick path/to/TICKscript -type stream -dbrp mydb.myrp - Later you can change a sinlge property of the task by referencing its name + Later you can change a sinlge property of the task by referencing its name and only providing the single option you wish to modify. $ kapacitor define -name my_task -tick path/to/TICKscript @@ -613,7 +613,7 @@ func doShow(args []string) error { } v := url.Values{} - v.Add("task", args[0]) + v.Add("name", args[0]) r, err := http.Get(*kapacitordURL + "/task?" + v.Encode()) if err != nil { return err @@ -736,7 +736,7 @@ func deleteUsage() { var u = `Usage: kapacitor delete (tasks|recordings) [task name|recording ID]... Delete a task or recording. - + If a task is enabled it will be disabled and then deleted. ` fmt.Fprintln(os.Stderr, u) diff --git a/cmd/kapacitord/main.go b/cmd/kapacitord/main.go index 3d53337ca..363557e02 100644 --- a/cmd/kapacitord/main.go +++ b/cmd/kapacitord/main.go @@ -19,7 +19,7 @@ import ( // These variables are populated via the Go linker. var ( - version string = "v0.1" + version string commit string branch string ) diff --git a/cmd/kapacitord/run/command.go b/cmd/kapacitord/run/command.go index 274d01d66..8b70ae3b4 100644 --- a/cmd/kapacitord/run/command.go +++ b/cmd/kapacitord/run/command.go @@ -117,8 +117,7 @@ func (cmd *Command) Run(args ...string) error { // Create server from config and start it. buildInfo := &BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch} - l := cmd.logService.NewLogger("[srv] ", log.LstdFlags) - s, err := NewServer(config, buildInfo, l, cmd.logService) + s, err := NewServer(config, buildInfo, cmd.logService) if err != nil { return fmt.Errorf("create server: %s", err) } diff --git a/cmd/kapacitord/run/config.go b/cmd/kapacitord/run/config.go index 2ccaab64a..6584a620a 100644 --- a/cmd/kapacitord/run/config.go +++ b/cmd/kapacitord/run/config.go @@ -80,8 +80,8 @@ func NewDemoConfig() (*Config, error) { c.Replay.Dir = filepath.Join(homeDir, ".kapacitor", c.Replay.Dir) c.Task.Dir = filepath.Join(homeDir, ".kapacitor", c.Task.Dir) - c.DataDir = filepath.Join(homeDir, ".kapacitor", c.DataDir) + return c, nil } diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index ae0717bc6..231ce69e6 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -35,6 +35,27 @@ import ( const clusterIDFilename = "cluster.id" const serverIDFilename = "server.id" +var ( + //Published vars + cidVar = &expvar.String{} + + sidVar = &expvar.String{} + + hostVar = &expvar.String{} + + productVar = &expvar.String{} + + versionVar = &expvar.String{} +) + +func init() { + expvar.Publish(kapacitor.ClusterIDVarName, cidVar) + expvar.Publish(kapacitor.ServerIDVarName, sidVar) + expvar.Publish(kapacitor.HostVarName, hostVar) + expvar.Publish(kapacitor.ProductVarName, productVar) + expvar.Publish(kapacitor.VersionVarName, versionVar) +} + // BuildInfo represents the build details for the server code. type BuildInfo struct { Version string @@ -55,7 +76,7 @@ type Server struct { TaskMaster *kapacitor.TaskMaster - LogService *logging.Service + LogService logging.Interface HTTPDService *httpd.Service Streamer *streamer.Service TaskStore *task_store.Service @@ -79,8 +100,8 @@ type Server struct { } // NewServer returns a new instance of Server built from a config. -func NewServer(c *Config, buildInfo *BuildInfo, l *log.Logger, logService *logging.Service) (*Server, error) { - +func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*Server, error) { + l := logService.NewLogger("[srv] ", log.LstdFlags) s := &Server{ buildInfo: *buildInfo, dataDir: c.DataDir, @@ -272,26 +293,12 @@ func (s *Server) Open() error { return err } - // Publish Vars - cid := &expvar.String{} - cid.Set(s.ClusterID) - expvar.Publish(kapacitor.ClusterIDVarName, cid) - - sid := &expvar.String{} - sid.Set(s.ServerID) - expvar.Publish(kapacitor.ServerIDVarName, sid) - - host := &expvar.String{} - host.Set(s.hostname) - expvar.Publish(kapacitor.HostVarName, host) - - product := &expvar.String{} - product.Set(kapacitor.Product) - expvar.Publish(kapacitor.ProductVarName, product) - - version := &expvar.String{} - version.Set(s.buildInfo.Version) - expvar.Publish(kapacitor.VersionVarName, version) + // Set published vars + cidVar.Set(s.ClusterID) + sidVar.Set(s.ServerID) + hostVar.Set(s.hostname) + productVar.Set(kapacitor.Product) + versionVar.Set(s.buildInfo.Version) // Start profiling, if set. s.startProfile(s.CPUProfile, s.MemProfile) diff --git a/cmd/kapacitord/run/server_helper_test.go b/cmd/kapacitord/run/server_helper_test.go new file mode 100644 index 000000000..3364e0542 --- /dev/null +++ b/cmd/kapacitord/run/server_helper_test.go @@ -0,0 +1,288 @@ +// This package is a set of convenience helpers and structs to make integration testing easier +package run_test + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "testing" + + "github.com/influxdb/influxdb/client" + "github.com/influxdb/kapacitor" + "github.com/influxdb/kapacitor/cmd/kapacitord/run" + "github.com/influxdb/kapacitor/services/task_store" + "github.com/influxdb/kapacitor/wlog" +) + +// Server represents a test wrapper for run.Server. +type Server struct { + *run.Server + Config *run.Config +} + +// NewServer returns a new instance of Server. +func NewServer(c *run.Config) *Server { + buildInfo := &run.BuildInfo{ + Version: "testServer", + Commit: "testCommit", + Branch: "testBranch", + } + ls := &LogService{} + srv, _ := run.NewServer(c, buildInfo, ls) + s := Server{ + Server: srv, + Config: c, + } + configureLogging() + return &s +} + +// OpenServer opens a test server. +func OpenDefaultServer() *Server { + c := NewConfig() + return OpenServer(c) +} + +// OpenServer opens a test server. +func OpenServer(c *run.Config) *Server { + s := NewServer(c) + if err := s.Open(); err != nil { + panic(err.Error()) + } + return s +} + +// Close shuts down the server and removes all temporary paths. +func (s *Server) Close() { + s.Server.Close() + os.RemoveAll(s.Config.Replay.Dir) + os.RemoveAll(s.Config.Task.Dir) + os.RemoveAll(s.Config.DataDir) +} + +// URL returns the base URL for the httpd endpoint. +func (s *Server) URL() string { + if s.HTTPDService != nil { + return "http://" + s.HTTPDService.Addr().String() + } + panic("httpd server not found in services") +} + +// HTTPGet makes an HTTP GET request to the server and returns the response. +func (s *Server) HTTPGet(url string) (results string, err error) { + resp, err := http.Get(url) + if err != nil { + return "", err + } + body := string(MustReadAll(resp.Body)) + switch resp.StatusCode { + case http.StatusBadRequest: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + case http.StatusOK, http.StatusNoContent: + return body, nil + default: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } +} + +// HTTPPost makes an HTTP POST request to the server and returns the response. +func (s *Server) HTTPPost(url string, content []byte) (results string, err error) { + buf := bytes.NewBuffer(content) + resp, err := http.Post(url, "application/json", buf) + if err != nil { + return "", err + } + body := string(MustReadAll(resp.Body)) + switch resp.StatusCode { + case http.StatusBadRequest: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + case http.StatusOK, http.StatusNoContent: + return body, nil + default: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } +} + +// Write executes a write against the server and returns the results. +func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) { + if params == nil { + params = url.Values{} + } + if params.Get("db") == "" { + params.Set("db", db) + } + if params.Get("rp") == "" { + params.Set("rp", rp) + } + resp, err := http.Post(s.URL()+"/api/v1/write?"+params.Encode(), "", strings.NewReader(body)) + if err != nil { + return "", err + } else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body)) + } + return string(MustReadAll(resp.Body)), nil +} + +func (s *Server) DefineTask(name, ttype, tick string, dbrps []kapacitor.DBRP) (results string, err error) { + dbrpsStr, err := json.Marshal(dbrps) + if err != nil { + return + } + v := url.Values{} + v.Add("name", name) + v.Add("type", ttype) + v.Add("dbrps", string(dbrpsStr)) + results, err = s.HTTPPost(s.URL()+"/api/v1/task?"+v.Encode(), []byte(tick)) + return +} + +func (s *Server) EnableTask(name string) (string, error) { + v := url.Values{} + v.Add("name", name) + return s.HTTPPost(s.URL()+"/api/v1/enable?"+v.Encode(), nil) +} + +func (s *Server) DisableTask(name string) (string, error) { + v := url.Values{} + v.Add("name", name) + return s.HTTPPost(s.URL()+"/api/v1/disable?"+v.Encode(), nil) +} + +func (s *Server) DeleteTask(name string) error { + v := url.Values{} + v.Add("name", name) + req, err := http.NewRequest("DELETE", s.URL()+"/api/v1/task?"+v.Encode(), nil) + if err != nil { + return err + } + client := &http.Client{} + r, err := client.Do(req) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK) + } + // Decode valid response + type resp struct { + Error string `json:"Error"` + } + d := json.NewDecoder(r.Body) + rp := resp{} + d.Decode(&rp) + if rp.Error != "" { + return errors.New(rp.Error) + } + return nil +} + +func (s *Server) GetTask(name string) (ti task_store.TaskInfo, err error) { + v := url.Values{} + v.Add("name", name) + resp, err := http.Get(s.URL() + "/api/v1/task?" + v.Encode()) + if err != nil { + return + } + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("unexpected status code got %d exp %d", resp.StatusCode, http.StatusOK) + } + defer resp.Body.Close() + d := json.NewDecoder(resp.Body) + d.Decode(&ti) + return +} + +// MustReadAll reads r. Panic on error. +func MustReadAll(r io.Reader) []byte { + b, err := ioutil.ReadAll(r) + if err != nil { + panic(err) + } + return b +} + +// MustWrite executes a write to the server. Panic on error. +func (s *Server) MustWrite(db, rp, body string, params url.Values) string { + results, err := s.Write(db, rp, body, params) + if err != nil { + panic(err) + } + return results +} + +// NewConfig returns the default config with temporary paths. +func NewConfig() *run.Config { + c := run.NewConfig() + c.Reporting.Enabled = false + c.Replay.Dir = MustTempFile() + c.Task.Dir = MustTempFile() + c.DataDir = MustTempFile() + c.HTTP.BindAddress = "127.0.0.1:0" + c.InfluxDB.Enabled = false + return c +} + +// MustTempFile returns a path to a temporary file. +func MustTempFile() string { + f, err := ioutil.TempFile("", "influxd-") + if err != nil { + panic(err) + } + f.Close() + os.Remove(f.Name()) + return f.Name() +} + +func configureLogging() { + if testing.Verbose() { + wlog.SetLevel(wlog.DEBUG) + } else { + wlog.SetLevel(wlog.OFF) + } +} + +type LogService struct{} + +func (l *LogService) NewLogger(prefix string, flag int) *log.Logger { + return wlog.New(os.Stderr, prefix, flag) +} + +func (l *LogService) NewStaticLevelLogger(prefix string, flag int, level wlog.Level) *log.Logger { + return log.New(wlog.NewStaticLevelWriter(os.Stderr, level), prefix, flag) +} + +type queryFunc func(q string) *client.Response + +type InfluxDB struct { + server *httptest.Server +} + +func NewInfluxDB(callback queryFunc) *InfluxDB { + handler := func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query().Get("q") + res := callback(q) + enc := json.NewEncoder(w) + enc.Encode(res) + } + return &InfluxDB{ + server: httptest.NewServer(http.HandlerFunc(handler)), + } +} + +func (i *InfluxDB) URL() string { + return i.server.URL +} + +func (i *InfluxDB) Close() { + i.server.Close() +} diff --git a/cmd/kapacitord/run/server_test.go b/cmd/kapacitord/run/server_test.go new file mode 100644 index 000000000..b94dc6b08 --- /dev/null +++ b/cmd/kapacitord/run/server_test.go @@ -0,0 +1,465 @@ +package run_test + +import ( + "fmt" + "net/http" + "net/url" + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/models" + "github.com/influxdb/kapacitor" +) + +func TestServer_Ping(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + r, err := s.HTTPGet(s.URL() + "/api/v1/ping") + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result") + } +} + +func TestServer_Version(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + resp, err := http.Get(s.URL() + "/api/v1/ping") + if err != nil { + t.Fatal(err) + } + version := resp.Header.Get("X-KAPACITOR-Version") + + if version != "testServer" { + t.Fatal("unexpected version", version) + } +} + +func TestServer_DefineTask(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + + name := "testTaskName" + ttype := "stream" + dbrps := []kapacitor.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + tick := "stream.from('test')" + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + ti, err := s.GetTask(name) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.Name != name { + t.Fatalf("unexpected name got %s exp %s", ti.Name, name) + } + if ti.Type != kapacitor.StreamTask { + t.Fatalf("unexpected type got %s exp %s", ti.Type, kapacitor.StreamTask) + } + if ti.Enabled != false { + t.Fatalf("unexpected enabled got %v exp %v", ti.Enabled, false) + } + if !reflect.DeepEqual(ti.DBRPs, dbrps) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, dbrps) + } + if ti.TICKscript != tick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick) + } + dot := "digraph testTaskName {\nstream0 -> stream1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot) + } +} + +func TestServer_EnableTask(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + + name := "testTaskName" + ttype := "stream" + dbrps := []kapacitor.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + tick := "stream.from('test')" + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + r, err = s.EnableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + ti, err := s.GetTask(name) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.Name != name { + t.Fatalf("unexpected name got %s exp %s", ti.Name, name) + } + if ti.Type != kapacitor.StreamTask { + t.Fatalf("unexpected type got %s exp %s", ti.Type, kapacitor.StreamTask) + } + if ti.Enabled != true { + t.Fatalf("unexpected enabled got %v exp %v", ti.Enabled, true) + } + if !reflect.DeepEqual(ti.DBRPs, dbrps) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, dbrps) + } + if ti.TICKscript != tick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick) + } + dot := "digraph testTaskName {\nstream0 -> stream1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot) + } +} + +func TestServer_DisableTask(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + + name := "testTaskName" + ttype := "stream" + dbrps := []kapacitor.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + tick := "stream.from('test')" + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + r, err = s.EnableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + r, err = s.DisableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + ti, err := s.GetTask(name) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.Name != name { + t.Fatalf("unexpected name got %s exp %s", ti.Name, name) + } + if ti.Type != kapacitor.StreamTask { + t.Fatalf("unexpected type got %s exp %s", ti.Type, kapacitor.StreamTask) + } + if ti.Enabled != false { + t.Fatalf("unexpected enabled got %v exp %v", ti.Enabled, false) + } + if !reflect.DeepEqual(ti.DBRPs, dbrps) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, dbrps) + } + if ti.TICKscript != tick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick) + } + dot := "digraph testTaskName {\nstream0 -> stream1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot) + } +} + +func TestServer_DeleteTask(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + + name := "testTaskName" + ttype := "stream" + dbrps := []kapacitor.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + tick := "stream.from('test')" + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + err = s.DeleteTask(name) + if err != nil { + t.Fatal(err) + } + + ti, err := s.GetTask(name) + if err == nil { + t.Fatal("unexpected task:", ti) + } +} + +func TestServer_StreamTask(t *testing.T) { + t.Parallel() + s := OpenDefaultServer() + defer s.Close() + + name := "testTaskName" + ttype := "stream" + dbrps := []kapacitor.DBRP{{ + Database: "mydb", + RetentionPolicy: "myrp", + }} + tick := ` +stream + .from('test') + .window() + .period(10s) + .every(10s) + .mapReduce(influxql.count('value')) + .httpOut('count') +` + + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + r, err = s.EnableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + endpoint := fmt.Sprintf("%s/api/v1/%s/count", s.URL(), name) + + // Request data before any writes and expect null responses + r, err = s.HTTPGet(endpoint) + if err != nil { + t.Fatal(err) + } + nullResponse := `{"Series":null,"Err":null}` + if r != nullResponse { + t.Fatalf("unexpected result got %s exp %s", r, nullResponse) + } + + points := `test value=1 0000000000 +test value=1 0000000001 +test value=1 0000000001 +test value=1 0000000002 +test value=1 0000000002 +test value=1 0000000003 +test value=1 0000000003 +test value=1 0000000004 +test value=1 0000000005 +test value=1 0000000005 +test value=1 0000000005 +test value=1 0000000006 +test value=1 0000000007 +test value=1 0000000008 +test value=1 0000000009 +test value=1 0000000010 +test value=1 0000000011 +` + v := url.Values{} + v.Add("precision", "s") + s.MustWrite("mydb", "myrp", points, v) + + exp := `{"Series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Err":null}` + retries := 3 + for retries > 0 { + r, err = s.HTTPGet(endpoint) + if err != nil { + t.Fatal(err) + } + if r == nullResponse { + retries-- + time.Sleep(time.Millisecond * 5) + } else { + break + } + } + if r != exp { + t.Fatalf("unexpected result\ngot %s\nexp %s", r, exp) + } +} + +func TestServer_BatchTask(t *testing.T) { + t.Parallel() + c := NewConfig() + c.InfluxDB.Enabled = true + count := 0 + db := NewInfluxDB(func(q string) *client.Response { + if len(q) > 6 && q[:6] == "SELECT" { + count++ + return &client.Response{ + Results: []client.Result{{ + Series: []models.Row{{ + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 1, int(time.Millisecond), time.UTC).Format(time.RFC3339Nano), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 1, 2*int(time.Millisecond), time.UTC).Format(time.RFC3339Nano), + 1.0, + }, + }, + }}, + }}, + } + } + return nil + }) + c.InfluxDB.URLs = []string{db.URL()} + s := OpenServer(c) + defer s.Close() + + name := "testTaskName" + ttype := "batch" + dbrps := []kapacitor.DBRP{{ + Database: "mydb", + RetentionPolicy: "myrp", + }} + tick := ` +batch + .query(' SELECT value from mydb.myrp.cpu ') + .period(5ms) + .every(5ms) + .mapReduce(influxql.count('value')) + .httpOut('count') +` + + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + r, err = s.EnableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + endpoint := fmt.Sprintf("%s/api/v1/%s/count", s.URL(), name) + + // Request data before any writes and expect null responses + r, err = s.HTTPGet(endpoint) + if err != nil { + t.Fatal(err) + } + nullResponse := `{"Series":null,"Err":null}` + if r != nullResponse { + t.Fatalf("unexpected result got %s exp %s", r, nullResponse) + } + + exp := `{"Series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}],"Err":null}` + retries := 3 + for retries > 0 { + r, err = s.HTTPGet(endpoint) + if err != nil { + t.Fatal(err) + } + if r == nullResponse { + retries-- + time.Sleep(time.Millisecond * 5) + } else { + break + } + } + if r != exp { + t.Errorf("unexpected result\ngot %s\nexp %s", r, exp) + } + + r, err = s.DisableTask(name) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + if count == 0 { + t.Error("unexpected query count", count) + } +} diff --git a/edge.go b/edge.go index a8a11da71..67c8aee4e 100644 --- a/edge.go +++ b/edge.go @@ -5,6 +5,7 @@ import ( "expvar" "fmt" "log" + "sync" "github.com/influxdb/kapacitor/models" "github.com/influxdb/kapacitor/pipeline" @@ -33,6 +34,7 @@ type Edge struct { logger *log.Logger closed bool + mu sync.Mutex statMap *expvar.Map } @@ -55,6 +57,8 @@ func newEdge(name string, t pipeline.EdgeType, logService LogService) *Edge { } func (e *Edge) Close() { + e.mu.Lock() + defer e.mu.Unlock() if e.closed { return } @@ -83,7 +87,7 @@ func (e *Edge) Next() (p models.PointInterface, ok bool) { } func (e *Edge) NextPoint() (p models.Point, ok bool) { - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! next point c: %s e: %s\n", @@ -99,7 +103,7 @@ func (e *Edge) NextPoint() (p models.Point, ok bool) { } func (e *Edge) NextBatch() (b models.Batch, ok bool) { - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! next batch c: %s e: %s\n", @@ -115,7 +119,7 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) { } func (e *Edge) NextMaps() (m *MapResult, ok bool) { - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! next maps c: %s e: %s\n", @@ -142,8 +146,10 @@ func (e *Edge) recover(errp *error) { } func (e *Edge) CollectPoint(p models.Point) (err error) { + e.mu.Lock() + defer e.mu.Unlock() defer e.recover(&err) - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! collect point c: %s e: %s\n", @@ -157,8 +163,10 @@ func (e *Edge) CollectPoint(p models.Point) (err error) { } func (e *Edge) CollectBatch(b models.Batch) (err error) { + e.mu.Lock() + defer e.mu.Unlock() defer e.recover(&err) - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! collect batch c: %s e: %s\n", @@ -172,8 +180,10 @@ func (e *Edge) CollectBatch(b models.Batch) (err error) { } func (e *Edge) CollectMaps(m *MapResult) (err error) { + e.mu.Lock() + defer e.mu.Unlock() defer e.recover(&err) - if wlog.LogLevel == wlog.DEBUG { + if wlog.LogLevel() == wlog.DEBUG { // Explicitly check log level since this is expensive and frequent e.logger.Printf( "D! collect maps c: %s e: %s\n", diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 8ce7db6f3..91a3eef1b 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -101,9 +101,9 @@ batch // Helper test function for batcher func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) { if testing.Verbose() { - wlog.LogLevel = wlog.DEBUG + wlog.SetLevel(wlog.DEBUG) } else { - wlog.LogLevel = wlog.OFF + wlog.SetLevel(wlog.OFF) } // Create task diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index d0646af8a..e6fda46ac 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -32,7 +32,7 @@ var dbrps = []kapacitor.DBRP{ } func init() { - wlog.LogLevel = wlog.OFF + wlog.SetLevel(wlog.OFF) // create API server config := httpd.NewConfig() config.BindAddress = ":0" // Choose port dynamically @@ -1590,9 +1590,9 @@ topScores.sample(4s) // Helper test function for streamer func testStreamer(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) { if testing.Verbose() { - wlog.LogLevel = wlog.DEBUG + wlog.SetLevel(wlog.DEBUG) } else { - wlog.LogLevel = wlog.OFF + wlog.SetLevel(wlog.OFF) } //Create the task diff --git a/models/batch.go b/models/batch.go index ca8250d86..3a93e01c8 100644 --- a/models/batch.go +++ b/models/batch.go @@ -125,9 +125,12 @@ func ResultToBatches(res client.Result) ([]Batch, error) { return nil, fmt.Errorf("unexpected time value: %v", v[i]) } var err error - t, err = time.Parse(time.RFC3339, tStr) + t, err = time.Parse(time.RFC3339Nano, tStr) if err != nil { - return nil, fmt.Errorf("unexpected time format: %v", err) + t, err = time.Parse(time.RFC3339, tStr) + if err != nil { + return nil, fmt.Errorf("unexpected time format: %v", err) + } } } else { value := v[i] @@ -145,6 +148,9 @@ func ResultToBatches(res client.Result) ([]Batch, error) { } } if !skip { + if t.After(b.TMax) { + b.TMax = t + } b.Points = append( b.Points, BatchPoint{Time: t, Fields: fields, Tags: b.Tags}, diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 73fa6c093..28f518c5b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -215,7 +215,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // serveLogLevel sets the log level of the server func (h *Handler) serveLogLevel(w http.ResponseWriter, r *http.Request) { l := r.URL.Query().Get("level") - err := wlog.SetLevel(l) + err := wlog.SetLevelFromName(l) if err != nil { HttpError(w, err.Error(), true, http.StatusBadRequest) } diff --git a/services/logging/service.go b/services/logging/service.go index 30efeee00..b8e63618f 100644 --- a/services/logging/service.go +++ b/services/logging/service.go @@ -9,6 +9,12 @@ import ( "github.com/influxdb/kapacitor/wlog" ) +// Interface for creating new loggers +type Interface interface { + NewLogger(prefix string, flag int) *log.Logger + NewStaticLevelLogger(prefix string, flag int, l wlog.Level) *log.Logger +} + type Service struct { f io.WriteCloser c Config @@ -46,7 +52,7 @@ func (s *Service) Open() error { s.f = f } - wlog.SetLevel(s.c.Level) + wlog.SetLevelFromName(s.c.Level) return nil } diff --git a/services/task_store/service.go b/services/task_store/service.go index 9fed33855..39a20198b 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -191,7 +191,7 @@ type TaskInfo struct { } func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) { - name := r.URL.Query().Get("task") + name := r.URL.Query().Get("name") if name == "" { httpd.HttpError(w, "must pass task name", true, http.StatusBadRequest) return diff --git a/task.go b/task.go index 4e33ad592..bb257007c 100644 --- a/task.go +++ b/task.go @@ -30,8 +30,8 @@ func (t TaskType) String() string { } type DBRP struct { - Database string - RetentionPolicy string + Database string `json:"db"` + RetentionPolicy string `json:"rp"` } func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool { diff --git a/wlog/writer.go b/wlog/writer.go index b909057fb..dcd0440cf 100644 --- a/wlog/writer.go +++ b/wlog/writer.go @@ -13,7 +13,7 @@ Simply pass a instance of wlog.Writer to log.New or use the helper wlog.New function. - The log level can be changed via the LogLevel variable or the SetLevel function. + The log level can be changed via the SetLevel or the SetLevelFromName functions. */ package wlog @@ -22,6 +22,7 @@ import ( "io" "log" "strings" + "sync" ) type Level int @@ -55,7 +56,21 @@ func init() { } // The global and only log level. Log levels are not implemented per writer. -var LogLevel = INFO +var logLevel = INFO + +var mu sync.RWMutex + +func SetLevel(l Level) { + mu.Lock() + defer mu.Unlock() + logLevel = l +} + +func LogLevel() Level { + mu.RLock() + defer mu.RUnlock() + return logLevel +} // name to Level mappings var levels = map[string]Level{ @@ -66,11 +81,11 @@ var levels = map[string]Level{ "OFF": OFF, } -// Set the log level via a string name. To set it directly use 'LogLevel'. -func SetLevel(level string) error { +// Set the log level via a string name. To set it directly use 'logLevel'. +func SetLevelFromName(level string) error { l := levels[strings.ToUpper(level)] if l > 0 { - LogLevel = l + SetLevel(l) } else { return fmt.Errorf("invalid log level: %q", level) } @@ -112,7 +127,7 @@ func (w *Writer) Write(buf []byte) (int, error) { } } l := Levels[buf[w.start]] - if l >= LogLevel { + if l >= LogLevel() { return w.w.Write(buf) } else if l == 0 { buf = append(invalidMSG, buf...)