Skip to content

Commit

Permalink
[FLINK-1173] [streaming] Add socket text stream as a data source for …
Browse files Browse the repository at this point in the history
…the streaming API
  • Loading branch information
chiwanpark authored and mbalassi committed Nov 24, 2014
1 parent 42fe874 commit 80416ac
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
Expand Down Expand Up @@ -262,6 +263,28 @@ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collectio
return returnStream;
}

/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
* character set.
*
* @param hostname
* The host name which a server socket bind.
* @param port
* The port number which a server socket bind. A port number of
* 0 means that the port number is automatically allocated.
* @param delimiter
* A character which split received strings into records.
* @return A DataStream, containing the strings received from socket.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
}

public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, '\n');
}

/**
* Creates a new DataStream that contains a sequence of numbers.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.source;

import org.apache.flink.util.Collector;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;

public class SocketTextStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;

private String hostname;
private int port;
private char delimiter;

public SocketTextStreamFunction(String hostname, int port, char delimiter) {
this.hostname = hostname;
this.port = port;
this.delimiter = delimiter;
}

@Override
public void invoke(Collector<String> collector) throws Exception {
Socket socket = new Socket(hostname, port);
while (!socket.isClosed() && socket.isConnected()) {
streamFromSocket(collector, socket);
}
}

public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
StringBuffer buffer = new StringBuffer();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

while (true) {
int data = reader.read();
if (!socket.isConnected() || socket.isClosed() || data == -1) {
break;
}

if (data == delimiter) {
collector.collect(buffer.toString());
buffer = new StringBuffer();
} else if (data != '\r') { // ignore carriage return
buffer.append((char) data);
}
}

if (buffer.length() > 0) {
collector.collect(buffer.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
package org.apache.flink.streaming.api;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.util.MockCollector;
import org.apache.flink.streaming.util.MockSource;
import org.junit.Test;

public class SourceTest {

@Test
public void fromElementsTest() {
List<Integer> expectedList = Arrays.asList(1, 2, 3);
Expand All @@ -42,11 +49,27 @@ public void fromCollectionTest() {
List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(Arrays.asList(1, 2, 3)));
assertEquals(expectedList, actualList);
}

@Test
public void genSequenceTest() {
List<Long> expectedList = Arrays.asList(1L, 2L, 3L);
List<Long> actualList = MockSource.createAndExecute(new GenSequenceFunction(1, 3));
assertEquals(expectedList, actualList);
}

@Test
public void socketTextStreamTest() throws Exception {
List<String> expectedList = Arrays.asList("a", "b", "c");
List<String> actualList = new ArrayList<String>();

byte[] data = {'a', '\n', 'b', '\n', 'c', '\n'};

Socket socket = mock(Socket.class);
when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
when(socket.isClosed()).thenReturn(false);
when(socket.isConnected()).thenReturn(true);

new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(actualList), socket);
assertEquals(expectedList, actualList);
}
}
26 changes: 26 additions & 0 deletions flink-addons/flink-streaming/flink-streaming-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,32 @@ under the License.
</includes>
</configuration>
</execution>

<!-- SocketTextStreamWordCount -->
<execution>
<id>SocketTextStreamWordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>SocketTextStreamWordCount</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.wordcount.SocketTextStreamWordCount</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount$*.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.wordcount;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* This example shows an implementation of WordCount with data from socket.
*
* <p>
* Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt; &lt;result path&gt;</code><br>
*
* <p>
* This example shows how to:
* <ul>
* <li>use StreamExecutionEnvironment.socketTextStream
* <li>write a simple Flink program,
* <li>write and use user-defined functions.
* </ul>
*/
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data
DataStream<String> text = env.socketTextStream(hostname, port);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new WordCount.Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).sum(1);

if (fileOutput) {
counts.writeAsText(outputPath, 1);
} else {
counts.print();
}

// execute program
env.execute("WordCount with SocketTextStream Example");
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private static boolean fileOutput = false;
private static String hostname;
private static int port;
private static String outputPath;

private static boolean parseParameters(String[] args) {

if (args.length > 0) {
// parse input arguments
if (args.length == 3) {
fileOutput = true;
hostname = args[0];
port = Integer.valueOf(args[1]);
outputPath = args[2];
} else if (args.length == 2) {
hostname = args[0];
port = Integer.valueOf(args[1]);
} else {
System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
return false;
}
} else {
System.out.println("Executing WordCount example with data from socket.");
System.out.println(" Provide parameters to connect data source.");
System.out.println(" Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
return false;
}
return true;
}
}

0 comments on commit 80416ac

Please sign in to comment.