Skip to content

Commit

Permalink
[FLINK-1964] Reimplement TwitterSource
Browse files Browse the repository at this point in the history
This closes apache#1796
This closes apache#666
  • Loading branch information
rmetzger committed Mar 16, 2016
1 parent 33f4311 commit e70d49e
Showing 21 changed files with 265 additions and 1,408 deletions.
74 changes: 28 additions & 46 deletions docs/apis/streaming/connectors/twitter.md
Original file line number Diff line number Diff line change
@@ -26,7 +26,9 @@ specific language governing permissions and limitations
under the License.
-->

Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream. To use this connector, add the following dependency to your project:
Twitter Streaming API allows to connect to the stream of tweets made available by Twitter.
Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.
To use this connector, add the following dependency to your project:

{% highlight xml %}
<dependency>
@@ -36,72 +38,52 @@ Twitter Streaming API provides opportunity to connect to the stream of tweets ma
</dependency>
{% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
Note that the streaming connectors are currently not part of the binary distribution.
See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).

#### Authentication
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

#### Acquiring the authentication information
First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application, the API key and API secret (called `consumerKey` and `consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (`token` and `secret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup)
or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by
clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application, the API key and API secret (called `twitter-source.consumerKey` and `twitter-source.consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab.
The necessary OAuth Access Token data (`twitter-source.token` and `twitter-source.tokenSecret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
Remember to keep these pieces of information secret and do not push them to public repositories.

#### Accessing the authentication information
Create a properties file, and pass its path in the constructor of `TwitterSource`. The content of the file should be similar to this:

~~~bash
#properties file for my app
secret=***
consumerSecret=***
token=***-***
consumerKey=***
~~~

#### Constructors
The `TwitterSource` class has two constructors.

1. `public TwitterSource(String authPath, int numberOfTweets);`
to emit a finite number of tweets
2. `public TwitterSource(String authPath);`
for streaming

Both constructors expect a `String authPath` argument determining the location of the properties file containing the authentication information. In the first case, `numberOfTweets` determines how many tweet the source emits.



#### Usage
In contrast to other connectors, the `TwitterSource` depends on no additional services. For example the following code should run gracefully:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
Properties props = new Properties();
p.setProperty(TwitterSource.CONSUMER_KEY, "");
p.setProperty(TwitterSource.CONSUMER_SECRET, "");
p.setProperty(TwitterSource.TOKEN, "");
p.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
val props = new Properties();
p.setProperty(TwitterSource.CONSUMER_KEY, "");
p.setProperty(TwitterSource.CONSUMER_SECRET, "");
p.setProperty(TwitterSource.TOKEN, "");
p.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
{% endhighlight %}
</div>
</div>

The `TwitterSource` emits strings containing a JSON code.
To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation `JSONParseFlatMap` abstract class among the examples. `JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
String getField(String jsonText, String field);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
getField(jsonText : String, field : String) : String
{% endhighlight %}
</div>
</div>
The `TwitterSource` emits strings containing a JSON object, representing a Tweet.

function which can be use to acquire the value of a given field.
The `TwitterExample` class in the `flink-examples-streaming` package shows a full example how to use the `TwitterSource`.

There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
By default, the `TwitterSource` uses the `StatusesSampleEndpoint`. This endpoint returns a random sample of Tweets.
There is a `TwitterSource.EndpointInitializer` interface allowing users to provide a custom endpoint.

#### Example
`TwitterStream` is an example of how to use `TwitterSource`. It implements a language frequency counter program.
2 changes: 1 addition & 1 deletion flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
@@ -163,7 +163,7 @@ under the License.
</includes>
<excludes>
<exclude>flink-examples-streaming*.jar</exclude>
<exclude>original-flink-examples-streaming*.jar</exclude>
<exclude>original-*.jar</exclude>
</excludes>
</fileSet>

55 changes: 30 additions & 25 deletions flink-examples/flink-examples-streaming/pom.xml
Original file line number Diff line number Diff line change
@@ -193,30 +193,6 @@ under the License.
</configuration>
</execution>

<!-- Twitter -->
<execution>
<id>Twitter</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>Twitter</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/examples/twitter/*.class</include>
<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
<include>org/apache/flink/streaming/connectors/json/*.class</include>
</includes>
</configuration>
</execution>

<!-- WindowJoin -->
<execution>
<id>WindowJoin</id>
@@ -476,7 +452,6 @@ under the License.
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Twitter.jar" tofile="${project.basedir}/target/Twitter.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketWindowWordCount.jar" tofile="${project.basedir}/target/SocketWindowWordCount.jar" />
@@ -527,6 +502,36 @@ under the License.
</filters>
</configuration>
</execution>
<execution>
<id>fat-jar-twitter-example</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.examples.twitter.TwitterExample</mainClass>
</transformer>
</transformers>
<finalName>Twitter</finalName>
<filters>
<filter>
<artifact>*</artifact>
<includes>
<include>org/apache/flink/streaming/examples/twitter/**</include>
<include>org/apache/flink/streaming/connectors/twitter/**</include>
<include>org/apache/http/**</include>
<include>com/twitter/**</include>
<include>build.properties</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

Original file line number Diff line number Diff line change
@@ -17,28 +17,31 @@

package org.apache.flink.streaming.examples.twitter;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

import java.util.StringTokenizer;

/**
* Implements the "TwitterStream" program that computes a most used word
* occurrence over JSON files in a streaming fashion.
* occurrence over JSON objects in a streaming fashion.
* <p>
* The input is a JSON text file with lines separated by newline characters.
* The input is a Tweet stream from a TwitterSource.
* </p>
* <p>
* Usage: <code>TwitterStream [--output &lt;path&gt;] [--props &lt;path&gt;]</code><br>
* Usage: <code>Usage: TwitterExample [--output <path>]
* [--twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> --twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret>]</code><br>
*
* If no parameters are provided, the program is run with default data from
* {@link TwitterStreamData}.
* {@link TwitterExampleData}.
* </p>
* <p>
* This example shows how to:
@@ -48,7 +51,7 @@
* <li>handle flattened stream inputs.
* </ul>
*/
public class TwitterStream {
public class TwitterExample {

// *************************************************************************
// PROGRAM
@@ -58,24 +61,31 @@ public static void main(String[] args) throws Exception {

// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
System.out.println("Usage: TwitterStream --output <path> --props <path>");
System.out.println("Usage: TwitterExample [--output <path>] " +
"[--twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> --twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret>]");

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

env.setParallelism(params.getInt("parallelism", 1));

// get input data
DataStream<String> streamSource;
if (params.has("props")) {
// read the text file from given input path
streamSource = env.addSource(new TwitterSource(params.get("props")));
if (params.has(TwitterSource.CONSUMER_KEY) &&
params.has(TwitterSource.CONSUMER_SECRET) &&
params.has(TwitterSource.TOKEN) &&
params.has(TwitterSource.TOKEN_SECRET)
) {
streamSource = env.addSource(new TwitterSource(params.getProperties()));
} else {
System.out.println("Executing TwitterStream example with default props.");
System.out.println("Use --props to specify the path to the authentication info.");
System.out.println("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> " +
"--twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret> specify the authentication info.");
// get default test text data
streamSource = env.fromElements(TwitterStreamData.TEXTS);
streamSource = env.fromElements(TwitterExampleData.TEXTS);
}

DataStream<Tuple2<String, Integer>> tweets = streamSource
@@ -101,37 +111,39 @@ public static void main(String[] args) throws Exception {
// *************************************************************************

/**
* Makes sentences from English tweets.
* Deserialize JSON from twitter source
*
* <p>
* Implements a string tokenizer that splits sentences into words as a
* user-defined FlatMapFunction. The function takes a line (String) and
* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
* Integer>}).
*/
public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
public static class SelectEnglishAndTokenizeFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;

private transient ObjectMapper jsonParser;
/**
* Select the language from the incoming JSON text
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
try {
if (getString(value, "user.lang").equals("en")) {
// message of tweet
StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));

// split the message
while (tokenizer.hasMoreTokens()) {
String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();

if (!result.equals("")) {
out.collect(new Tuple2<>(result, 1));
}
if(jsonParser == null) {
jsonParser = new ObjectMapper();
}
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
if (jsonNode.has("user") && jsonNode.get("user").get("lang").asText().equals("en")) {
// message of tweet
StringTokenizer tokenizer = new StringTokenizer(jsonNode.get("text").asText());

// split the message
while (tokenizer.hasMoreTokens()) {
String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();

if (!result.equals("")) {
out.collect(new Tuple2<>(result, 1));
}
}
} catch (JSONException e) {
// the JSON was not parsed correctly
}
}
}
Loading
Oops, something went wrong.

0 comments on commit e70d49e

Please sign in to comment.