forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
clean up readme to remove discussion pieces and add DSL examples
- Loading branch information
1 parent
1e04ffe
commit 4db7856
Showing
1 changed file
with
65 additions
and
197 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,102 +132,60 @@ There are two different ways to consume Kapacitor. | |
Processing data follows a pipeline and depending on the processing needs that pipeline can vary significantly. | ||
Kapacitor models the different data processing pipelines as a DAGs (Directed Acyclic Graphs) and allows the user to specify the structure of the DAG via a DSL. | ||
There are two approaches to constructing a DAG for stream processing based on the current popular tools. | ||
1. Explicitly define a DAG and what each node does. For example Storm [Storm](http://storm.apache.org) | ||
```java | ||
TopologyBuilder builder = new TopologyBuilder(); | ||
builder.setSpout("words", new TestWordSpout(), 10); | ||
builder.setBolt("exclaim1", new ExclamationBolt(), 3) | ||
.shuffleGrouping("words"); | ||
builder.setBolt("exclaim2", new ExclamationBolt(), 2) | ||
.shuffleGrouping("exclaim1"); | ||
builder.setBolt("finish", new FinishBolt(), 2) | ||
.shuffleGrouping("exclaim1") | ||
.shuffleGrouping("exclaim2"); | ||
``` | ||
Here you create a DAGs explicitly via linking named nodes. You define a `bolt` at each node which is essentially a function that transforms the data. | ||
This way of defining the DAG requires that you follow the chain of names to reconstruct a visual layout of the DAG. | ||
![Alt text](http://g.gravizo.com/g? | ||
digraph G { | ||
rankdir=LR; | ||
words -> exclaim1; | ||
words -> exclaim2; | ||
exclaim1 -> finish; | ||
exclaim2 -> finish; | ||
} | ||
) | ||
2. Implicitly define the DAG via operators and invocation chaining. For example [Flink](http://flink.apache.org) defines a DAGs like this: | ||
```scala | ||
val windowedStream = stream | ||
.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS)) | ||
//Compute some simple statistics on a rolling window | ||
val lowest = windowedStream.minBy("cpu_idle") | ||
val maxByStock = windowedStream.groupBy("host").maxBy("cpu_idle") | ||
val rollingMean = windowedStream.groupBy("host").mapWindow(mean _) | ||
``` | ||
Notice the use of the methods `window`, `minBy`, `maxBy` and `mapWindow`. These method create new nodes in the DAG resulting a in data processing pipeline like: | ||
Kapacitor allows you to define the DAG implicitly via operators and invocation chaining in an pipeline API. Similar to how [Flink](http://flink.apache.org) and [Spark](http://spark.apache.org) work. | ||
![Alt text](http://g.gravizo.com/g? | ||
digraph G { | ||
rankdir=LR; | ||
stream -> window; | ||
window -> minBy [label="raw"]; | ||
window -> maxBy [label="groupby 'host'"]; | ||
window -> mean [label="groupby 'host'"]; | ||
} | ||
) | ||
## DSL | ||
This method give you control over the DAGs but you do not have to link it all up via named nodes. The plumbing is done via invocation chaining method calls. | ||
Kapacitor uses a DSL to define the DAG so that you are not required to write and compile Go code. | ||
# DSL | ||
Based on how the DAG is constructed you can use the DSL to both construct the DAG and define what each node does via built-in functions. | ||
The following is an example DSL script that triggers an alert if idle cpu drops below 30%. In this DSL the keyword `stream` represents the stream of fields and values from the data points. | ||
The following is an example DSL script that triggers an alert if idle cpu drops below 30%. In this DSL the variable `stream` represents the stream of values from InfluxDB. | ||
``` | ||
var window = stream.window().period(10s).every(5s) | ||
var avg = window.map(mean, "value") | ||
avg.filter(< , 30).alert() | ||
stream | ||
.window() | ||
.period(10s) | ||
.every(5s) | ||
.map(influxql.mean, "value") | ||
.filter("value", "<", 30) | ||
.alert() | ||
.email("[email protected]"); | ||
``` | ||
This script maintains a window of data for 10s and emits the current window every 5s. | ||
Then the average is calculated for each emitted window. | ||
Finally all values less than `30` pass through the filter and make it to the alert node, which triggers the alert. | ||
Finally all values less than `30` pass through the filter and make it to the alert node, which triggers the alert by sending an email. | ||
The DAG that is constructed from the script looks like this: | ||
![Alt text](http://g.gravizo.com/g? | ||
digraph G { | ||
rankdir=LR; | ||
stream -> window; | ||
window -> avg[label="every 10s"]; | ||
avg -> filter; | ||
window -> mean[label="every 10s"]; | ||
mean -> filter; | ||
filter -> alert [label="<30"]; | ||
} | ||
) | ||
We have not mentioned parallelism yet, by adding `groupby` and `parallelism` statements we can see how to easily scale out each layer of the DAG. | ||
Based on how the DAG is constructed you can use the DSL to both construct the DAG and define what each node does via built-in functions. | ||
Notice how the `map` function took an argument of another function `influxql.mean`, this is an example of a built-in function that can be used to process the data stream. | ||
It will also be possible to define your own functions via plugins to Kapacitor and reference them in the DSL. | ||
``` | ||
var window = stream.window().period(10s).every(5s) | ||
var avg = window.groupby("dc").map(mean, "value") | ||
We have not mentioned parallelism yet, by adding `groupby` and `parallelism` statements we can see how to easily scale out each layer of the DAG. | ||
avg.filter(<, 30).parallelism(4).alert().parallelism(2) | ||
``` | ||
stream | ||
.window() | ||
.period(10s) | ||
.every(5s) | ||
.groupby("dc") | ||
.map(influxql.mean, "value") | ||
.filter("<", 30) | ||
.parallelism(4) | ||
.alert() | ||
.email("[email protected]"); | ||
``` | ||
|
@@ -266,175 +224,85 @@ The DAG that is constructed is similar, but with parallelism explicitly shown.: | |
filter_2 [label="2"]; | ||
filter_3 [label="3"]; | ||
} | ||
filter_0 -> alert_0; | ||
filter_0 -> alert_1; | ||
filter_1 -> alert_0; | ||
filter_1 -> alert_1; | ||
filter_2 -> alert_0; | ||
filter_2 -> alert_1; | ||
filter_3 -> alert_0; | ||
filter_3 -> alert_1; | ||
subgraph cluster_alert { | ||
label="alert" | ||
alert_0 [label="0"]; | ||
alert_1 [label="1"]; | ||
} | ||
filter_0 -> alert; | ||
filter_1 -> alert; | ||
filter_2 -> alert; | ||
filter_3 -> alert; | ||
} | ||
) | ||
Parellelism is easily achieved and scaled at each layer. | ||
The advantages of the stream based DSL is we define both the data pipeline and each transformation in the same script. | ||
### The DSL and batch processing | ||
Batch processors work similarly to the stream processing. | ||
Example DSL for batchers where we are running a query every minute and want to alert on cpu. The query: `select mean(value) from cpu_idle group by dc, time(1m)`. | ||
``` | ||
batch.filter(<, 30).alert() | ||
batch | ||
.filter("<", 30) | ||
.alert() | ||
.email("[email protected]"); | ||
``` | ||
## What can you do with the DSL? | ||
The main difference is instead of a stream object we start with a batch object. Since batches are already windowed there is not need to define a new window. | ||
### What you can do with the DSL | ||
* Define the DAG for your data pipeline needs. | ||
* Window data. Windowing can be done by time or by number of data points and various other conditions, see [this](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#window-operators). | ||
* Aggregate data. The list of aggregation functions currently supported by InfluxQL is probably a good place to start. | ||
* Transform data via built-in functions. | ||
* Transform data via custom functions. | ||
* Filter down streams/batches of data. | ||
* Emit data into a new stream. | ||
* Emit data into an InfluxDB database. | ||
* Trigger events/notifications. | ||
## What you cannot do with the DSL? | ||
### What you can NOT do with the DSL | ||
* Define custom functions in the DSL. | ||
You can call out to custom functions defined via a plugin mechanism. | ||
You can call out to custom functions defined via a plugins but you cannot define the function itself within the DSL. | ||
The DSL will be too slow to actually process any of the data but is used simply to define the data flow. | ||
### Example DSL scripts | ||
## Other Notes on the DSL | ||
* We like the idea of writing our own lexer and parser to keep the DSL simple and flexible for us. | ||
* DSL should be EBNF. | ||
Several examples demonstrating various features of the DSL follow: | ||
### POC | ||
#### Setup a dead man's switch | ||
So to see if we were headed down the right path I built a proof of concept a little DSL to API system. | ||
For example the following DSL script gets parsed and via reflection calls the following go code. | ||
DSL: | ||
If your stream stops sending data this may be serious cause for concern. Setting up a 'dead man's switch' is quite simple: | ||
``` | ||
var w = stream.window(); | ||
w.period(10s); | ||
w.every(1s); | ||
//Create dead man's switch | ||
stream | ||
.window() | ||
.period(1m) | ||
.every(1m) | ||
.map(influxql.count, "value") | ||
.filter("==", 0) | ||
.alert(); | ||
//Now define normal processing on the stream | ||
stream | ||
... | ||
``` | ||
Go equivalent: | ||
```go | ||
s := &Stream{} | ||
w := s.Window() | ||
w.Period(time.Duration(10) * time.Second) | ||
w.Every(time.Duration(1) * time.Second) | ||
``` | ||
Local scope is populate so that `stream` is a go `Stream` object. | ||
When the DSL is parsed and then evaluated the `window` method gets called directly on the stream instance. | ||
Each method is called via reflection so as we update the Go API the DSL dynamically maps to it. | ||
Since the DSL is only used to setup the DAG we wont likely have to worry about the performance hit of reflection. | ||
Example Go code that consumes the DSL to build a DAG: | ||
```go | ||
var dslScript = ` | ||
var w = stream.window(); | ||
w.period(10s); | ||
w.every(1s); | ||
` | ||
s, err := dsl.CreatePipeline(dslScript) | ||
if err != nil { | ||
return err | ||
log.Fatal(err) | ||
} | ||
s.Start() //start the stream pipeline. | ||
``` | ||
Since `s` in the above example is just an instance of a `Stream` object we could have created it directly in go or via the DSL as shown. | ||
Now that we have a reference to the `s` stream we no longer need or execute code from the DSL. | ||
**This means that we can build out a golang API similar to Flink's and then instead of shipping jars around with compiled code we can ship DSL scripts around and have nearly the same features and performance.** | ||
The only piece that is missing in this model are user defined functions; but using the DSL to define and execute those is going to be way to slow. We just need to figure out a way over sockets etc for users | ||
to define their own functions, (which was the plan from the beginning). | ||
# Components | ||
Below are the logical components to make the workflow possible. | ||
* Kapacitor daemon `kapacitord` that listens on a net/unix socket and manages the rest of the components. | ||
* Kapacitor daemon `kapacitord` that manages the rest of the components. | ||
* Matching -- uses the `where` clause of a streamer to map points in the data stream to a streamer instance. | ||
* Interpreter for DSL -- executes the DSL based on incoming metrics. | ||
* Pipeline Deployment -- takes the defined DAG from the DSL and deploys it on the cluster. | ||
* Stream engine -- keeps track of various streams and their topologies. | ||
* Batch engine -- handles the results of scheduled queries and passes them to batchers for processing. | ||
* Replay engine -- records and replays bits of the data stream to the stream engine. Can replay metrics to independent streams so testing can be done in isolation of live stream. Can also save the result of a query for replaying. | ||
* Query Scheduler -- keeps track of schedules for various script and executes them passing data to the batch engine. | ||
* Streamer/Batcher manager -- handles defining, updating and shipping streamers and batchers around. | ||
* API -- HTTP API for accessing method of other components. | ||
* API -- HTTP API for accessing methods of other components. | ||
* CLI -- `kapacitor` command line utility to call the HTTP API. | ||
# Questions | ||
* Q: How do we scale beyond a single instance? | ||
* A: We could use the matching component to shard and route different series within the stream to the specific nodes within the cluster. AKA consistent sharding. | ||
* A: We could make the DSL and plugins in general a Map-Reduce framework so that each instance only handles a portion of the stream. | ||
We will need to provide an all-in-one solution so that you don't have to deal with the complexities MR if you don't want to. | ||
Storm and Spark both apache projects have a workflow where you can define MR jobs for streaming data. | ||
Storm is a DAG based approach where the developer get to define the topology and the transformation at each node. | ||
Spark has a rich set of operators that have implicit topologies, so the workflow with Spark is to choose a predefine operator like MR and then define the transformations within the topology. | ||
I like the DAG approach as it is explicit and will be easier to debug. | ||
In contrast Spark makes it easier to get up and running because they solve part of the problem for you. | ||
Maybe we build some predefine DAGs that are easy to use, like the all-in-one idea mentioned above. | ||
* Other ideas? | ||
* Q: Do we need a different DSL for `streamers`s vs `batcher`s? | ||
* A: Hopefully not, but we need to make sure that it is clear how they work (and good error messages) since a DSL script written for a `streamer` will not work for a `batcher` and vice versa. | ||
* Q: Should the DSL contain the `from`, `where`, `query`, `period`, and `groupby_interval` options within it? | ||
Example: | ||
``` | ||
type: streamer | ||
from: cpu_idle | ||
where: cpu = 'cpu-total' | ||
var window = stream.window().period(10s).every(10s) | ||
``` | ||
or | ||
``` | ||
type: batcher | ||
query: select mean(value) from cpu_idle where role = 'logs' group by dc | ||
period: 15m | ||
groupby_interval: 1h | ||
batch.filter(<, 30).alert() | ||
``` | ||
I like the simplicity of putting all the information in a single file. | ||
I can also see issues where you have a single script that processes data from several different sources and duplicating/importing or otherwise maintaining that association could get difficult. | ||
Based on my POC it is helpful to know where the DSL script is a streamer or batcher script before parsing so I think we should leave the DSL as just the script and not the meta information. | ||
We should probably find a way to encapsulate the meta information in a config file or something though so its not all CLI args. | ||
* Q: How is Kapacitor different than other stream processing engines like Storm, Spark, and Flink, apart from its golang and specific to the TICK stack? | ||
A: I think we need to understand where we are going to be different than these other tools so we can adjust correctly from their design patterns. | ||
One thing that comes to mind is 'ease of use'. Like we discussed earlier, if people need incredible performance they are going to build it themselves; so we want to build an out-of-the box solution. | ||