From d1356c5aebe2ac83d4cf100885b7e1e75bf7e496 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 11 Nov 2015 15:17:04 -0700 Subject: [PATCH] update docs and add truncate to stream node --- examples/scores/kapacitor.conf | 118 ++++++++++++++++++++++++++++++++ examples/scores/scores.sh | 25 +++++++ examples/scores/top_scores.tick | 56 +++++++++++++++ pipeline/batch.go | 6 ++ pipeline/sample.go | 2 +- pipeline/stream.go | 12 ++++ stream.go | 3 + tick/TICKscript.md | 2 +- tick/cmd/tickdoc/main.go | 16 ++++- update_tick_docs.sh | 6 ++ window.go | 1 - 11 files changed, 241 insertions(+), 6 deletions(-) create mode 100644 examples/scores/kapacitor.conf create mode 100755 examples/scores/scores.sh create mode 100644 examples/scores/top_scores.tick diff --git a/examples/scores/kapacitor.conf b/examples/scores/kapacitor.conf new file mode 100644 index 000000000..4082f8f0c --- /dev/null +++ b/examples/scores/kapacitor.conf @@ -0,0 +1,118 @@ +# The hostname of this node. +# Must be resolvable by any configured InfluxDB hosts. +hostname = "localhost" +# Directory for storing a small amount of metadata about the server. +data_dir = "/var/lib/kapacitor" +# Your Enterprise token. By using Enterprise you can +# send all internal statistics to the Enterprise +# endpoints which will store and report on the +# activity of your instances. +token = "" + +[http] + # HTTP API Server for Kapacitor + # This server is always on, + # it servers both as a write endpoint + # and as the API endpoint for all other + # Kapacitor calls. + bind-address = ":9092" + auth-enabled = false + log-enabled = true + write-tracing = false + pprof-enabled = false + https-enabled = false + https-certificate = "/etc/ssl/kapacitor.pem" + + +[[udp]] + # UDP input for the scores stream + enabled = true + bind-address = ":9100" + database = "game" + retention-policy = "default" + + +[logging] + # Destination for logs + # Can be a path to a file or 'STDOUT', 'STDERR'. + file = "/var/log/kapacitor/kapacitor.log" + # Logging level can be one of: + # DEBUG, INFO, WARN, ERROR, or OFF + level = "INFO" + +[replay] + # Where to store replay files, aka recordings. + dir = "/var/lib/kapacitor/replay" + +[task] + # Where to store the tasks database + dir = "/var/lib/kapacitor/tasks" + +[influxdb] + # Connect to an InfluxDB cluster + # Kapacitor can subscribe, query and write to this cluster. + # Using InfluxDB is not required and can be disabled. + enabled = true + urls = ["http://localhost:8086"] + username = "" + password = "" + timeout = 0 + [influxdb.subscriptions] + # Set of databases and retention policies to subscribe to. + # If empty will subscribe to all. + # + # Format + # db_name = + # + # Example: + # my_database = [ "default", "longterm" ] + +[smtp] + # Configure an SMTP email server + # Will use TLS and authentication if possible + # Only necessary for sending emails from alerts. + enabled = false + host = "localhost" + port = 25 + username = "" + password = "" + # Close idle connections after timeout + idle-timeout = "30s" + +[reporting] + # Send anonymous usage statistics + # every 12 hours to Enterprise. + enabled = true + enterprise-url = "https://enterprise.influxdata.com" + # The interval at which to send all + # internal statistics to Enterprise. + # If no token is specified this + # setting has no effect. + stats-interval = "1m0s" + +################################## +# Input Methods, same as InfluxDB +# + +[collectd] + enabled = false + bind-address = ":25826" + database = "collectd" + retention-policy = "" + batch-size = 1000 + batch-pending = 5 + batch-timeout = "10s" + typesdb = "/usr/share/collectd/types.db" + +[opentsdb] + enabled = false + bind-address = ":4242" + database = "opentsdb" + retention-policy = "" + consistency-level = "one" + tls-enabled = false + certificate = "/etc/ssl/influxdb.pem" + batch-size = 1000 + batch-pending = 5 + batch-timeout = "1s" + diff --git a/examples/scores/scores.sh b/examples/scores/scores.sh new file mode 100755 index 000000000..1eeb34003 --- /dev/null +++ b/examples/scores/scores.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# default options: can be overriden with corresponding arguments. +host=${1-localhost} +port=${2-9100} +games=${3-10} +players=${4-100} + +games=$(seq $games) +players=$(seq $players) +# Spam score updates over UDP +while true +do + for game in $games + do + game="g$game" + for player in $players + do + player="p$player" + score=$(($RANDOM % 1000)) + echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port + done + done + sleep 0.1 +done diff --git a/examples/scores/top_scores.tick b/examples/scores/top_scores.tick new file mode 100644 index 000000000..c138b7406 --- /dev/null +++ b/examples/scores/top_scores.tick @@ -0,0 +1,56 @@ +// Define a result that contains the most recent score per player. +var topPlayerScores = stream + .from('scores') + // Get the most recent score for each player per game. + // Not likely that a player is playing two games but just in case. + .groupBy('game', 'player') + .window() + // keep a buffer of the last 11s of scores + // just in case a player score hasn't updated in a while + .period(11s) + // Emit the current score per player every second. + .every(1s) + // Align the window boundaries to be on the second. + .align() + .mapReduce(influxql.last('value')) + +// Calculate the top 15 scores per game +var topScores = topPlayerScores + .groupBy('game') + .mapReduce(influxql.top(15, 'last', 'player')) + +// Expose top scores over the HTTP API at the 'top_scores' endpoint. +// Now your app can just request the top scores from Kapacitor +// and always get the most recent result. +// +// http://localhost:9092/api/v1/top_scores/top_scores +topScores + .httpOut('top_scores') + +// Sample the top scores and keep a score once every 10s +var topScoresSampled = topScores + .sample(10s) + +// Store top fifteen player scores in InfluxDB. +topScoresSampled + .influxDBOut() + .database('game') + .measurement('top_scores') + +// Calculate the max and min of the top scores. +var max = topScoresSampled + .mapReduce(influxql.max('top')) +var min = topScoresSampled + .mapReduce(influxql.min('top')) + +// Join the max and min streams back together and calculate the gap. +max.join(min) + .as('max', 'min') + // calculate the difference between the max and min scores. + .eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min") + .as('gap', 'topFirst', 'topLast') + // store the fields: gap, topFirst, and topLast in InfluxDB. + .influxDBOut() + .database('game') + .measurement('top_scores_gap') + diff --git a/pipeline/batch.go b/pipeline/batch.go index 9fe8e6297..304213b7a 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -110,6 +110,12 @@ func newBatchNode() *BatchNode { } // Group the data by a set of dimensions. +// Can specify one time dimension. +// +// Example: +// batch +// .groupBy(time(10s), 'tag1', 'tag2')) +// // tick:property func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode { b.Dimensions = d diff --git a/pipeline/sample.go b/pipeline/sample.go index f0bb19494..f616f5ebc 100644 --- a/pipeline/sample.go +++ b/pipeline/sample.go @@ -18,7 +18,7 @@ import ( // .sample(10s) // // Keep only samples that land on the 10s boundary. -// See WindowNode.Align or BatchNode.GroupBy time, +// See StreamNode.Truncate, BatchNode.GroupBy time or WindowNode.Align // for ensuring data is aligned with a boundary. type SampleNode struct { chainnode diff --git a/pipeline/stream.go b/pipeline/stream.go index 4d0d932f4..90143037e 100644 --- a/pipeline/stream.go +++ b/pipeline/stream.go @@ -1,6 +1,8 @@ package pipeline import ( + "time" + "github.com/influxdb/kapacitor/tick" ) @@ -30,6 +32,16 @@ type StreamNode struct { // The db.rp.m from clause // tick:ignore FromSelector string + + // Optional duration for truncating timestamps. + // Helpful to ensure data points land on specfic boundaries + // Example: + // stream + // .from('mydata') + // .truncate(1s) + // + // All incoming data will be truncated to 1 second resolution. + Truncate time.Duration } func newStreamNode() *StreamNode { diff --git a/stream.go b/stream.go index c9aef527a..a00d0bb19 100644 --- a/stream.go +++ b/stream.go @@ -57,6 +57,9 @@ func (s *StreamNode) runStream() error { for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() { if s.matches(pt) { + if s.s.Truncate != 0 { + pt.Time = pt.Time.Truncate(s.s.Truncate) + } for _, child := range s.outs { err := child.CollectPoint(pt) if err != nil { diff --git a/tick/TICKscript.md b/tick/TICKscript.md index 041974269..39a04335e 100644 --- a/tick/TICKscript.md +++ b/tick/TICKscript.md @@ -49,7 +49,7 @@ int_lit = "1" … "9" { digit } letter = ascii_letter | "_" . number_lit = digit { digit } { "." {digit} } . duration_lit = int_lit duration_unit . -duration_unit = "u" | "µ" | "ms" | "s" | "h" | "d" | "w" . +duration_unit = "u" | "µ" | "ms" | "s" | "m" | "h" | "d" | "w" . string_lit = `'` { unicode_char } `'` . star_lit = "*" diff --git a/tick/cmd/tickdoc/main.go b/tick/cmd/tickdoc/main.go index 5c741ad99..4aba2dc19 100644 --- a/tick/cmd/tickdoc/main.go +++ b/tick/cmd/tickdoc/main.go @@ -329,13 +329,23 @@ func addNodeLinks(nodes map[string]*Node, line string) []byte { scan := bufio.NewScanner(strings.NewReader(line)) scan.Split(bufio.ScanWords) for scan.Scan() { - word := scan.Text() - node := strings.TrimFunc(word, unicode.IsPunct) + word := strings.TrimFunc(scan.Text(), unicode.IsPunct) + parts := strings.Split(word, ".") + node := word + method := "" + if len(parts) == 2 { + node = parts[0] + method = parts[1] + } if nodes[node] != nil && ast.IsExported(node) { buf.Write([]byte("[")) buf.Write(scan.Bytes()) buf.Write([]byte("](")) - buf.Write([]byte(nodeNameToLink(node))) + if method == "" { + buf.Write([]byte(nodeNameToLink(node))) + } else { + buf.Write([]byte(methodNameToLink(node, method))) + } buf.Write([]byte(") ")) } else { buf.Write(scan.Bytes()) diff --git a/update_tick_docs.sh b/update_tick_docs.sh index bae5b6a4f..2e7e4a7d0 100755 --- a/update_tick_docs.sh +++ b/update_tick_docs.sh @@ -7,6 +7,12 @@ dest=$1 # output path for the .md files docspath=${2-/docs/kapacitor/v0.1/tick} +if [ -z "$dest" ] +then + echo "Usage: ./update_tick_docs.sh output_path [docspath]" + exit 1 +fi + tickdoc $docspath ./pipeline $dest diff --git a/window.go b/window.go index 19c432d7f..60604c2e0 100644 --- a/window.go +++ b/window.go @@ -26,7 +26,6 @@ func newWindowNode(et *ExecutingTask, n *pipeline.WindowNode) (*WindowNode, erro } func (w *WindowNode) runWindow() error { - windows := make(map[models.GroupID]*window) // Loops through points windowing by group for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {