Skip to content

Commit

Permalink
[FLINK-3552] [examples] Add a properly windowed word count reading fr…
Browse files Browse the repository at this point in the history
…om a socket (Java + Scala)
  • Loading branch information
StephanEwen committed Mar 4, 2016
1 parent 271071a commit 986d536
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 353 deletions.
41 changes: 7 additions & 34 deletions flink-examples/flink-examples-streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,51 +303,25 @@ under the License.
</configuration>
</execution>

<!-- WindowWordCount -->
<!-- SocketWindowWordCount -->
<execution>
<id>WindowWordCount</id>
<id>SocketWindowWordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WindowWordCount</classifier>
<classifier>SocketWindowWordCount</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
<program-class>org.apache.flink.streaming.examples.socket.SocketWindowWordCount</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>

<!-- SocketTextStreamWordCount -->
<execution>
<id>SocketTextStreamWordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>SocketTextStreamWordCount</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
<include>org/apache/flink/streaming/examples/socket/SocketWindowWordCount.class</include>
<include>org/apache/flink/streaming/examples/socket/SocketWindowWordCount$*.class</include>
</includes>
</configuration>
</execution>
Expand Down Expand Up @@ -517,12 +491,11 @@ under the License.
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-IncrementalLearning.jar" tofile="${project.basedir}/target/IncrementalLearning.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketTextStreamWordCount.jar" tofile="${project.basedir}/target/SocketTextStreamWordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Twitter.jar" tofile="${project.basedir}/target/Twitter.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowWordCount.jar" tofile="${project.basedir}/target/WindowWordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketWindowWordCount.jar" tofile="${project.basedir}/target/SocketWindowWordCount.jar" />
</target>
</configuration>
</execution>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.socket;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* Implements a streaming windowed version of the "WordCount" program.
*
* This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text sever (at port 12345)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 12345
* </pre>
* and run this example with the port as an argument.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {

public static void main(String[] args) throws Exception {

// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'WindowWordCount --port <port>', " +
"where port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
"into the command line");
return;
}

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, '\n');

// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text

.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})

.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))

.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);

env.execute("Socket Window WordCount");
}

// ------------------------------------------------------------------------

/**
* Data type for words with count
*/
public static class WordWithCount {

public String word;
public long count;

public WordWithCount() {}

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return word + " : " + count;
}
}
}

This file was deleted.

Loading

0 comments on commit 986d536

Please sign in to comment.