Skip to content

Commit

Permalink
update docs and add truncate to stream node
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 11, 2015
1 parent 0393f37 commit d1356c5
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 6 deletions.
118 changes: 118 additions & 0 deletions examples/scores/kapacitor.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# The hostname of this node.
# Must be resolvable by any configured InfluxDB hosts.
hostname = "localhost"
# Directory for storing a small amount of metadata about the server.
data_dir = "/var/lib/kapacitor"
# Your Enterprise token. By using Enterprise you can
# send all internal statistics to the Enterprise
# endpoints which will store and report on the
# activity of your instances.
token = ""

[http]
# HTTP API Server for Kapacitor
# This server is always on,
# it servers both as a write endpoint
# and as the API endpoint for all other
# Kapacitor calls.
bind-address = ":9092"
auth-enabled = false
log-enabled = true
write-tracing = false
pprof-enabled = false
https-enabled = false
https-certificate = "/etc/ssl/kapacitor.pem"


[[udp]]
# UDP input for the scores stream
enabled = true
bind-address = ":9100"
database = "game"
retention-policy = "default"


[logging]
# Destination for logs
# Can be a path to a file or 'STDOUT', 'STDERR'.
file = "/var/log/kapacitor/kapacitor.log"
# Logging level can be one of:
# DEBUG, INFO, WARN, ERROR, or OFF
level = "INFO"

[replay]
# Where to store replay files, aka recordings.
dir = "/var/lib/kapacitor/replay"

[task]
# Where to store the tasks database
dir = "/var/lib/kapacitor/tasks"

[influxdb]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
urls = ["http://localhost:8086"]
username = ""
password = ""
timeout = 0
[influxdb.subscriptions]
# Set of databases and retention policies to subscribe to.
# If empty will subscribe to all.
#
# Format
# db_name = <list of retention policies>
#
# Example:
# my_database = [ "default", "longterm" ]

[smtp]
# Configure an SMTP email server
# Will use TLS and authentication if possible
# Only necessary for sending emails from alerts.
enabled = false
host = "localhost"
port = 25
username = ""
password = ""
# Close idle connections after timeout
idle-timeout = "30s"

[reporting]
# Send anonymous usage statistics
# every 12 hours to Enterprise.
enabled = true
enterprise-url = "https://enterprise.influxdata.com"
# The interval at which to send all
# internal statistics to Enterprise.
# If no token is specified this
# setting has no effect.
stats-interval = "1m0s"

##################################
# Input Methods, same as InfluxDB
#

[collectd]
enabled = false
bind-address = ":25826"
database = "collectd"
retention-policy = ""
batch-size = 1000
batch-pending = 5
batch-timeout = "10s"
typesdb = "/usr/share/collectd/types.db"

[opentsdb]
enabled = false
bind-address = ":4242"
database = "opentsdb"
retention-policy = ""
consistency-level = "one"
tls-enabled = false
certificate = "/etc/ssl/influxdb.pem"
batch-size = 1000
batch-pending = 5
batch-timeout = "1s"

25 changes: 25 additions & 0 deletions examples/scores/scores.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# default options: can be overriden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}

games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
for game in $games
do
game="g$game"
for player in $players
do
player="p$player"
score=$(($RANDOM % 1000))
echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
done
done
sleep 0.1
done
56 changes: 56 additions & 0 deletions examples/scores/top_scores.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Define a result that contains the most recent score per player.
var topPlayerScores = stream
.from('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
.window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
.mapReduce(influxql.last('value'))

// Calculate the top 15 scores per game
var topScores = topPlayerScores
.groupBy('game')
.mapReduce(influxql.top(15, 'last', 'player'))

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/api/v1/top_scores/top_scores
topScores
.httpOut('top_scores')

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
.sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
.influxDBOut()
.database('game')
.measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
.mapReduce(influxql.max('top'))
var min = topScoresSampled
.mapReduce(influxql.min('top'))

// Join the max and min streams back together and calculate the gap.
max.join(min)
.as('max', 'min')
// calculate the difference between the max and min scores.
.eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// store the fields: gap, topFirst, and topLast in InfluxDB.
.influxDBOut()
.database('game')
.measurement('top_scores_gap')

6 changes: 6 additions & 0 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ func newBatchNode() *BatchNode {
}

// Group the data by a set of dimensions.
// Can specify one time dimension.
//
// Example:
// batch
// .groupBy(time(10s), 'tag1', 'tag2'))
//
// tick:property
func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode {
b.Dimensions = d
Expand Down
2 changes: 1 addition & 1 deletion pipeline/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// .sample(10s)
//
// Keep only samples that land on the 10s boundary.
// See WindowNode.Align or BatchNode.GroupBy time,
// See StreamNode.Truncate, BatchNode.GroupBy time or WindowNode.Align
// for ensuring data is aligned with a boundary.
type SampleNode struct {
chainnode
Expand Down
12 changes: 12 additions & 0 deletions pipeline/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pipeline

import (
"time"

"github.com/influxdb/kapacitor/tick"
)

Expand Down Expand Up @@ -30,6 +32,16 @@ type StreamNode struct {
// The db.rp.m from clause
// tick:ignore
FromSelector string

// Optional duration for truncating timestamps.
// Helpful to ensure data points land on specfic boundaries
// Example:
// stream
// .from('mydata')
// .truncate(1s)
//
// All incoming data will be truncated to 1 second resolution.
Truncate time.Duration
}

func newStreamNode() *StreamNode {
Expand Down
3 changes: 3 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (s *StreamNode) runStream() error {

for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() {
if s.matches(pt) {
if s.s.Truncate != 0 {
pt.Time = pt.Time.Truncate(s.s.Truncate)
}
for _, child := range s.outs {
err := child.CollectPoint(pt)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tick/TICKscript.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ int_lit = "1" … "9" { digit }
letter = ascii_letter | "_" .
number_lit = digit { digit } { "." {digit} } .
duration_lit = int_lit duration_unit .
duration_unit = "u" | "µ" | "ms" | "s" | "h" | "d" | "w" .
duration_unit = "u" | "µ" | "ms" | "s" | "m" | "h" | "d" | "w" .
string_lit = `'` { unicode_char } `'` .
star_lit = "*"
Expand Down
16 changes: 13 additions & 3 deletions tick/cmd/tickdoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,23 @@ func addNodeLinks(nodes map[string]*Node, line string) []byte {
scan := bufio.NewScanner(strings.NewReader(line))
scan.Split(bufio.ScanWords)
for scan.Scan() {
word := scan.Text()
node := strings.TrimFunc(word, unicode.IsPunct)
word := strings.TrimFunc(scan.Text(), unicode.IsPunct)
parts := strings.Split(word, ".")
node := word
method := ""
if len(parts) == 2 {
node = parts[0]
method = parts[1]
}
if nodes[node] != nil && ast.IsExported(node) {
buf.Write([]byte("["))
buf.Write(scan.Bytes())
buf.Write([]byte("]("))
buf.Write([]byte(nodeNameToLink(node)))
if method == "" {
buf.Write([]byte(nodeNameToLink(node)))
} else {
buf.Write([]byte(methodNameToLink(node, method)))
}
buf.Write([]byte(") "))
} else {
buf.Write(scan.Bytes())
Expand Down
6 changes: 6 additions & 0 deletions update_tick_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
dest=$1 # output path for the .md files
docspath=${2-/docs/kapacitor/v0.1/tick}

if [ -z "$dest" ]
then
echo "Usage: ./update_tick_docs.sh output_path [docspath]"
exit 1
fi

tickdoc $docspath ./pipeline $dest


1 change: 0 additions & 1 deletion window.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func newWindowNode(et *ExecutingTask, n *pipeline.WindowNode) (*WindowNode, erro
}

func (w *WindowNode) runWindow() error {

windows := make(map[models.GroupID]*window)
// Loops through points windowing by group
for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {
Expand Down

0 comments on commit d1356c5

Please sign in to comment.