Skip to content

Commit

Permalink
track ingress stats (influxdata#599)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook committed Jun 3, 2016
1 parent dc52a42 commit d3fa807
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ In order to know if subscription writes are being dropped you should monitor the
- [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions.
- [#595](https://github.com/influxdata/kapacitor/pull/595): Support counting and summing empty batches to 0.
- [#596](https://github.com/influxdata/kapacitor/pull/596): Support new group by time offset i.e. time(30s, 5s)
- [#416](https://github.com/influxdata/kapacitor/issues/416): Track ingress counts by database, retention policy, and measurement. Expose stats via cli.


### Bugfixes
Expand Down
39 changes: 39 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const DefaultUserAgent = "KapacitorClient"
const basePath = "/kapacitor/v1"
const pingPath = basePath + "/ping"
const logLevelPath = basePath + "/loglevel"
const debugVarsPath = basePath + "/debug/vars"
const tasksPath = basePath + "/tasks"
const templatesPath = basePath + "/templates"
const recordingsPath = basePath + "/recordings"
Expand Down Expand Up @@ -540,6 +541,10 @@ type Replay struct {
Progress float64 `json:"progress"`
}

func (c *Client) URL() string {
return c.url.String()
}

// Perform the request.
// If result is not nil the response body is JSON decoded into result.
// Codes is a list of valid response codes.
Expand Down Expand Up @@ -1442,3 +1447,37 @@ func (c *Client) LogLevel(level string) error {
_, err = c.do(req, nil, http.StatusNoContent)
return err
}

type DebugVars struct {
ClusterID string `json:"cluster_id"`
ServerID string `json:"server_id"`
Host string `json:"host"`
Stats map[string]Stat `json:"kapacitor"`
Cmdline []string `json:"cmdline"`
NumEnabledTasks int `json:"num_enabled_tasks"`
NumSubscriptions int `json:"num_subscriptions"`
NumTasks int `json:"num_tasks"`
Memstats map[string]interface{} `json:"memstats"`
Version string `json:"version"`
}

type Stat struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}

// Get all Kapacitor vars
func (c *Client) DebugVars() (DebugVars, error) {
u := *c.url
u.Path = debugVarsPath

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return DebugVars{}, err
}

vars := DebugVars{}
_, err = c.do(req, &vars, http.StatusOK)
return vars, err
}
92 changes: 91 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -52,9 +54,11 @@ Commands:
list List information about tasks, templates, recordings or replays.
show Display detailed information about a task.
show-template Display detailed information about a template.
help Prints help for a command.
level Sets the logging level on the kapacitord server.
stats Display various stats about Kapacitor.
version Displays the Kapacitor version info.
vars Print debug vars in JSON format.
help Prints help for a command.
Options:
`
Expand Down Expand Up @@ -147,9 +151,15 @@ func main() {
case "level":
commandArgs = args
commandF = doLevel
case "stats":
commandArgs = args
commandF = doStats
case "version":
commandArgs = args
commandF = doVersion
case "vars":
commandArgs = args
commandF = doVars
default:
fmt.Fprintln(os.Stderr, "Unknown command", command)
usage()
Expand Down Expand Up @@ -229,8 +239,12 @@ func doHelp(args []string) error {
levelUsage()
case "help":
helpUsage()
case "stats":
statsUsage()
case "version":
versionUsage()
case "vars":
varsUsage()
default:
fmt.Fprintln(os.Stderr, "Unknown command", command)
usage()
Expand Down Expand Up @@ -1588,6 +1602,63 @@ func doLevel(args []string) error {
return cli.LogLevel(args[0])
}

// Stats
func statsUsage() {
var u = `Usage: kapacitor stats <general|ingress>
Print stats about a certain aspect of Kapacitor.
general - Display summary stats about Kapacitor.
ingress - Display stats about the data Kapacitor is receiving by database, retention policy and measurement.
`
fmt.Fprintln(os.Stderr, u)
}

func doStats(args []string) error {
if len(args) != 1 {
statsUsage()
return errors.New("must provide a stats category")
}
vars, err := cli.DebugVars()
if err != nil {
return err
}
switch args[0] {
case "general":
outFmtNum := "%-30s%-30d\n"
outFmtStr := "%-30s%-30s\n"
fmt.Fprintf(os.Stdout, outFmtStr, "ClusterID:", vars.ClusterID)
fmt.Fprintf(os.Stdout, outFmtStr, "ServerID:", vars.ServerID)
fmt.Fprintf(os.Stdout, outFmtStr, "Host:", vars.Host)
fmt.Fprintf(os.Stdout, outFmtNum, "Tasks:", vars.NumTasks)
fmt.Fprintf(os.Stdout, outFmtNum, "Enabled Tasks:", vars.NumEnabledTasks)
fmt.Fprintf(os.Stdout, outFmtNum, "Subscriptions:", vars.NumSubscriptions)
fmt.Fprintf(os.Stdout, outFmtStr, "Version:", vars.Version)
case "ingress":
outFmt := "%-30s%-30s%-30s%-20.0f\n"
fmt.Fprintf(os.Stdout, "%-30s%-30s%-30s%-20s\n", "Database", "Retention Policy", "Measurement", "Points Received")
for _, stat := range vars.Stats {
if stat.Name != "ingress" || stat.Tags["task_master"] != "main" {
continue
}
var pr float64
if i, ok := stat.Values["points_received"].(float64); ok {
pr = i
}
fmt.Fprintf(
os.Stdout,
outFmt,
stat.Tags["database"],
stat.Tags["retention_policy"],
stat.Tags["measurement"],
pr,
)
}
}

return nil
}

// Version
func versionUsage() {
var u = `Usage: kapacitor version
Expand All @@ -1601,3 +1672,22 @@ func doVersion(args []string) error {
fmt.Fprintf(os.Stdout, "Kapacitor %s (git: %s %s)\n", version, branch, commit)
return nil
}

// Vars
func varsUsage() {
var u = `Usage: kapacitor vars
Print debug vars in JSON format.
`
fmt.Fprintln(os.Stderr, u)
}

func doVars(args []string) error {
r, err := http.Get(cli.URL() + "/kapacitor/v1/debug/vars")
if err != nil {
return err
}
defer r.Body.Close()
io.Copy(os.Stdout, r.Body)
return nil
}
2 changes: 1 addition & 1 deletion cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start Task Master
s.TaskMaster = kapacitor.NewTaskMaster(logService)
s.TaskMaster = kapacitor.NewTaskMaster("main", logService)
if err := s.TaskMaster.Open(); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type FloatVar interface {
}

type StringVar interface {
StringValue() float64
StringValue() string
}

// Int is a 64-bit integer variable that satisfies the expvar.Var interface.
Expand Down
2 changes: 1 addition & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
}

// Create a new execution env
tm := kapacitor.NewTaskMaster(logService)
tm := kapacitor.NewTaskMaster("testBatcher", logService)
tm.HTTPDService = httpService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
Expand Down
2 changes: 1 addition & 1 deletion integrations/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func Bench(b *testing.B, tasksCount, pointCount int, db, rp, measurement, tickSc
for i := 0; i < b.N; i++ {
// Do not time setup
b.StopTimer()
tm := kapacitor.NewTaskMaster(&LogService{})
tm := kapacitor.NewTaskMaster("bench", &LogService{})
tm.HTTPDService = httpdService
tm.UDFService = nil
tm.TaskStore = taskStore{}
Expand Down
2 changes: 1 addition & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4673,7 +4673,7 @@ func testStreamer(
}

// Create a new execution env
tm := kapacitor.NewTaskMaster(logService)
tm := kapacitor.NewTaskMaster("testStreamer", logService)
tm.HTTPDService = httpService
tm.UDFService = udfService
tm.TaskStore = taskStore{}
Expand Down
12 changes: 11 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,16 @@ func (n *node) edot(buf *bytes.Buffer, labels bool) {
),
))
n.statMap.DoSorted(func(kv expvar.KeyValue) {
var s string
if sv, ok := kv.Value.(kexpvar.StringVar); ok {
s = sv.StringValue()
} else {
s = kv.Value.String()
}
buf.Write([]byte(
fmt.Sprintf("%s=\"%s\" ",
kv.Key,
kv.Value.String(),
s,
),
))
})
Expand Down Expand Up @@ -329,6 +335,10 @@ type MaxDuration struct {
}

func (v *MaxDuration) String() string {
return `"` + v.StringValue() + `"`
}

func (v *MaxDuration) StringValue() string {
return time.Duration(v.IntValue()).String()
}

Expand Down
23 changes: 12 additions & 11 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Service struct {
TaskMaster interface {
NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error)
DelFork(name string)
New() *kapacitor.TaskMaster
New(name string) *kapacitor.TaskMaster
Stream(name string) (kapacitor.StreamCollector, error)
}

Expand Down Expand Up @@ -911,7 +911,7 @@ func (s *Service) handleCreateReplay(w http.ResponseWriter, req *http.Request) {
s.replays.Create(replay)

go func(replay Replay) {
err := s.doReplayFromRecording(t, recording, clk, opt.RecordingTime)
err := s.doReplayFromRecording(opt.ID, t, recording, clk, opt.RecordingTime)
s.updateReplayResult(replay, err)
}(replay)

Expand Down Expand Up @@ -977,7 +977,7 @@ func (s *Service) handleReplayBatch(w http.ResponseWriter, req *http.Request) {
}

go func(replay Replay) {
err := s.doLiveBatchReplay(t, clk, opt.RecordingTime, opt.Start, opt.Stop, opt.Cluster)
err := s.doLiveBatchReplay(opt.ID, t, clk, opt.RecordingTime, opt.Start, opt.Stop, opt.Cluster)
s.updateReplayResult(replay, err)
}(replay)

Expand Down Expand Up @@ -1048,7 +1048,7 @@ func (r *Service) handleReplayQuery(w http.ResponseWriter, req *http.Request) {
w.Write(httpd.MarshalJSON(convertReplay(replay), true))
}

func (r *Service) doReplayFromRecording(task *kapacitor.Task, recording Recording, clk clock.Clock, recTime bool) error {
func (r *Service) doReplayFromRecording(id string, task *kapacitor.Task, recording Recording, clk clock.Clock, recTime bool) error {
dataSource, err := parseDataSourceURL(recording.DataURL)
if err != nil {
return errors.Wrap(err, "load data source")
Expand Down Expand Up @@ -1076,11 +1076,11 @@ func (r *Service) doReplayFromRecording(task *kapacitor.Task, recording Recordin
}
return <-replayC
}
return r.doReplay(task, runReplay)
return r.doReplay(id, task, runReplay)

}

func (r *Service) doLiveBatchReplay(task *kapacitor.Task, clk clock.Clock, recTime bool, start, stop time.Time, cluster string) error {
func (r *Service) doLiveBatchReplay(id string, task *kapacitor.Task, clk clock.Clock, recTime bool, start, stop time.Time, cluster string) error {
runReplay := func(tm *kapacitor.TaskMaster) error {
sources, recordErrC, err := r.startRecordBatch(task, start, stop, cluster)
if err != nil {
Expand All @@ -1100,7 +1100,7 @@ func (r *Service) doLiveBatchReplay(task *kapacitor.Task, clk clock.Clock, recTi
}
return nil
}
return r.doReplay(task, runReplay)
return r.doReplay(id, task, runReplay)
}

func (r *Service) doLiveQueryReplay(id string, task *kapacitor.Task, clk clock.Clock, recTime bool, query, cluster string) error {
Expand Down Expand Up @@ -1138,12 +1138,12 @@ func (r *Service) doLiveQueryReplay(id string, task *kapacitor.Task, clk clock.C
}
return nil
}
return r.doReplay(task, runReplay)
return r.doReplay(id, task, runReplay)
}

func (r *Service) doReplay(task *kapacitor.Task, runReplay func(tm *kapacitor.TaskMaster) error) error {
func (r *Service) doReplay(id string, task *kapacitor.Task, runReplay func(tm *kapacitor.TaskMaster) error) error {
// Create new isolated task master
tm := r.TaskMaster.New()
tm := r.TaskMaster.New(id)
tm.Open()
defer tm.Close()
et, err := tm.StartTask(task)
Expand Down Expand Up @@ -1271,7 +1271,8 @@ func (s *Service) doRecordBatch(dataSource DataSource, t *kapacitor.Task, start,
}

func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time, cluster string) ([]<-chan models.Batch, <-chan error, error) {
et, err := kapacitor.NewExecutingTask(s.TaskMaster.New(), t)
// We do not open the task master so it does not need to be closed
et, err := kapacitor.NewExecutingTask(s.TaskMaster.New(""), t)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit d3fa807

Please sign in to comment.