Skip to content

Commit

Permalink
SAMZA-301: Add syntax highlighting to documentation code listings.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Homan <[email protected]>
  • Loading branch information
davidzchen authored and jghoman committed Jun 23, 2014
1 parent c642eff commit d913037
Show file tree
Hide file tree
Showing 28 changed files with 654 additions and 478 deletions.
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
permalink: /:categories/:title
name: Samza
pygments: true
highlighter: pygments
markdown: redcarpet
exclude: ['_notes']
redcarpet:
Expand Down
1 change: 1 addition & 0 deletions docs/_layouts/default.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<link href="/css/bootstrap.min.css" rel="stylesheet"/>
<link href="/css/font-awesome.min.css" rel="stylesheet"/>
<link href="/css/main.css" rel="stylesheet"/>
<link href="/css/syntax.css" rel="stylesheet"/>
<link rel="icon" type="image/png" href="/img/samza-icon.png">
</head>
<body>
Expand Down
2 changes: 1 addition & 1 deletion docs/contribute/code.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ gem install jekyll

Depending on your system you may also need install some additional dependencies when you try and run it. Note that some Linux distributions may have older versions of Jekyll packaged that treat arguments differently and may result in changes not being incorporated into the generated site.

The script to commit the updated webpage files is docs/_tools/publish-site.sh
The script to commit the updated webpage files is `docs/_tools/publish-site.sh`
33 changes: 7 additions & 26 deletions docs/css/main.css
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

/* Non-responsive overrides
*
* Utilitze the following CSS to disable the responsive-ness of the container,
Expand Down Expand Up @@ -155,14 +154,13 @@ h4 {
pre {
border: 0px !important;
border-radius: 0px !important;
overflow: scroll !important;
white-space: pre;
overflow-wrap: normal;
word-wrap: normal !important;
overflow-x: auto;
background-color: #f7f7f7;
font-size: 12px;
}
pre code {
overflow-wrap: normal;
white-space: pre;
font-size: 12px;
}
th.header {
cursor: pointer;
Expand Down Expand Up @@ -212,29 +210,12 @@ td.key {
.committer-icon {
font-size: 16px;
}
ul.documentation-list {
list-style: none;
padding-left: 20px;
}
img.diagram-large {
width: 100%;
}
table.documentation {
border-collapse: collapse;
font-size: 12px;
margin: 1em 0;
}
table.documentation th, table.documentation td {
text-align: left;
vertical-align: top;
border: 1px solid #888;
padding: 5px;
}
table.documentation th.nowrap, table.documentation td.nowrap {
white-space: nowrap;
}
table.documentation th {
background-color: #eee;
ul.documentation-list {
list-style: none;
padding-left: 20px;
}
.footer {
clear: both;
Expand Down
74 changes: 74 additions & 0 deletions docs/css/syntax.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Name: Base16 Default Light
Author: Chris Kempson (http://chriskempson.com)
Pygments template by Jan T. Sott (https://github.com/idleberg)
Created with Base16 Builder by Chris Kempson (https://github.com/chriskempson/base16-builder)
*/
.highlight .hll { background-color: #e0e0e0 }
.highlight { background: #f5f5f5; color: #151515 }
.highlight .c { color: #b0b0b0 } /* Comment */
.highlight .err { color: #ac4142 } /* Error */
.highlight .k { color: #aa759f } /* Keyword */
.highlight .l { color: #d28445 } /* Literal */
.highlight .n { color: #151515 } /* Name */
.highlight .o { color: #75b5aa } /* Operator */
.highlight .p { color: #151515 } /* Punctuation */
.highlight .cm { color: #b0b0b0 } /* Comment.Multiline */
.highlight .cp { color: #b0b0b0 } /* Comment.Preproc */
.highlight .c1 { color: #b0b0b0 } /* Comment.Single */
.highlight .cs { color: #b0b0b0 } /* Comment.Special */
.highlight .gd { color: #ac4142 } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gh { color: #151515; font-weight: bold } /* Generic.Heading */
.highlight .gi { color: #90a959 } /* Generic.Inserted */
.highlight .gp { color: #b0b0b0; font-weight: bold } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #75b5aa; font-weight: bold } /* Generic.Subheading */
.highlight .kc { color: #aa759f } /* Keyword.Constant */
.highlight .kd { color: #aa759f } /* Keyword.Declaration */
.highlight .kn { color: #75b5aa } /* Keyword.Namespace */
.highlight .kp { color: #aa759f } /* Keyword.Pseudo */
.highlight .kr { color: #aa759f } /* Keyword.Reserved */
.highlight .kt { color: #f4bf75 } /* Keyword.Type */
.highlight .ld { color: #90a959 } /* Literal.Date */
.highlight .m { color: #d28445 } /* Literal.Number */
.highlight .s { color: #90a959 } /* Literal.String */
.highlight .na { color: #6a9fb5 } /* Name.Attribute */
.highlight .nb { color: #151515 } /* Name.Builtin */
.highlight .nc { color: #f4bf75 } /* Name.Class */
.highlight .no { color: #ac4142 } /* Name.Constant */
.highlight .nd { color: #75b5aa } /* Name.Decorator */
.highlight .ni { color: #151515 } /* Name.Entity */
.highlight .ne { color: #ac4142 } /* Name.Exception */
.highlight .nf { color: #6a9fb5 } /* Name.Function */
.highlight .nl { color: #151515 } /* Name.Label */
.highlight .nn { color: #f4bf75 } /* Name.Namespace */
.highlight .nx { color: #6a9fb5 } /* Name.Other */
.highlight .py { color: #151515 } /* Name.Property */
.highlight .nt { color: #75b5aa } /* Name.Tag */
.highlight .nv { color: #ac4142 } /* Name.Variable */
.highlight .ow { color: #75b5aa } /* Operator.Word */
.highlight .w { color: #151515 } /* Text.Whitespace */
.highlight .mf { color: #d28445 } /* Literal.Number.Float */
.highlight .mh { color: #d28445 } /* Literal.Number.Hex */
.highlight .mi { color: #d28445 } /* Literal.Number.Integer */
.highlight .mo { color: #d28445 } /* Literal.Number.Oct */
.highlight .sb { color: #90a959 } /* Literal.String.Backtick */
.highlight .sc { color: #151515 } /* Literal.String.Char */
.highlight .sd { color: #b0b0b0 } /* Literal.String.Doc */
.highlight .s2 { color: #90a959 } /* Literal.String.Double */
.highlight .se { color: #d28445 } /* Literal.String.Escape */
.highlight .sh { color: #90a959 } /* Literal.String.Heredoc */
.highlight .si { color: #d28445 } /* Literal.String.Interpol */
.highlight .sx { color: #90a959 } /* Literal.String.Other */
.highlight .sr { color: #90a959 } /* Literal.String.Regex */
.highlight .s1 { color: #90a959 } /* Literal.String.Single */
.highlight .ss { color: #90a959 } /* Literal.String.Symbol */
.highlight .bp { color: #151515 } /* Name.Builtin.Pseudo */
.highlight .vc { color: #ac4142 } /* Name.Variable.Class */
.highlight .vg { color: #ac4142 } /* Name.Variable.Global */
.highlight .vi { color: #ac4142 } /* Name.Variable.Instance */
.highlight .il { color: #d28445 } /* Literal.Number.Integer.Long */
134 changes: 73 additions & 61 deletions docs/learn/documentation/0.7.0/api/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,110 +21,122 @@ title: API Overview

When writing a stream processor for Samza, you must implement the [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) interface:

package com.example.samza;
{% highlight java %}
package com.example.samza;

public class MyTaskClass implements StreamTask {
public class MyTaskClass implements StreamTask {

public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message
}
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message
}
}
{% endhighlight %}

When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams.

In your job's configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the [configuration documentation](../jobs/configuration.html) for more detail):

# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass
{% highlight jproperties %}
# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass

# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent
# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
{% endhighlight %}

For each message that Samza receives from the task's input streams, the *process* method is called. The [envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three things of importance: the message, the key, and the stream that the message came from.

/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... }
{% highlight java %}
/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... }

/** A deserialized key. */
Object getKey() { ... }
/** A deserialized key. */
Object getKey() { ... }

/** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
}
/** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
}
{% endhighlight %}

The key and value are declared as Object, and need to be cast to the correct type. If you don't configure a [serializer/deserializer](../container/serialization.html), they are typically Java byte arrays. A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.

The getSystemStreamPartition() method returns a [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html) object, which tells you where the message came from. It consists of three parts:
The `getSystemStreamPartition()` method returns a [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html) object, which tells you where the message came from. It consists of three parts:

1. The *system*: the name of the system from which the message came, as defined in your job configuration. You can have multiple systems for input and/or output, each with a different name.
2. The *stream name*: the name of the stream (topic, queue) within the source system. This is also defined in the job configuration.
3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is normally split into several partitions, and each partition is assigned to one StreamTask instance by Samza.

The API looks like this:

/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {
{% highlight java %}
/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {

/** The name of the system which provides this stream. It is
defined in the Samza job's configuration. */
public String getSystem() { ... }
/** The name of the system which provides this stream. It is
defined in the Samza job's configuration. */
public String getSystem() { ... }

/** The name of the stream/topic/queue within the system. */
public String getStream() { ... }
/** The name of the stream/topic/queue within the system. */
public String getStream() { ... }

/** The partition within the stream. */
public Partition getPartition() { ... }
}
/** The partition within the stream. */
public Partition getPartition() { ... }
}
{% endhighlight %}

In the example job configuration above, the system name is "kafka", the stream name is "PageViewEvent". (The name "kafka" isn't special &mdash; you can give your system any name you want.) If you have several input streams feeding into your StreamTask, you can use the SystemStreamPartition to determine what kind of message you've received.

What about sending messages? If you take a look at the process() method in StreamTask, you'll see that you get a [MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html).

/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
}
{% highlight java %}
/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
}
{% endhighlight %}

To send a message, you create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) object and pass it to the message collector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for details.

**NOTE:** Please only use the MessageCollector object within the process() method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
**NOTE:** Please only use the MessageCollector object within the `process()` method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.

For example, here's a simple task that splits each input message into words, and emits each word as a separate message:

public class SplitStringIntoWords implements StreamTask {
{% highlight java %}
public class SplitStringIntoWords implements StreamTask {

// Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words");
// Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words");

public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();

for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1's to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
}
}
for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1's to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
}
}
}
{% endhighlight %}

## [SamzaContainer &raquo;](../container/samza-container.html)
Loading

0 comments on commit d913037

Please sign in to comment.