Skip to content

Commit

Permalink
[FLINK-1560] [streaming] Added ITCases to streaming examples
Browse files Browse the repository at this point in the history
  • Loading branch information
szape authored and mbalassi committed Apr 7, 2015
1 parent ed7d165 commit 464e782
Show file tree
Hide file tree
Showing 15 changed files with 789 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,24 @@
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Example illustrating iterations in Flink streaming.
* Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
* it performs to reach a specific threshold in an iterative streaming fashion. </p>
* <p/>
* <p>
* The program sums up random numbers and counts additions it performs to reach
* a specific threshold in an iterative streaming fashion.
* </p>
* <p/>
* <p/>
* This example shows how to use:
* <ul>
* <li>streaming iterations,
* <li>buffer timeout to enhance latency,
* <li>directed outputs.
* </ul>
* This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
* outputs. </ul>
*/
public class IterateExample {

private static final int BOUND = 100;

// *************************************************************************
// PROGRAM
// *************************************************************************
Expand All @@ -71,7 +63,7 @@ public static void main(String[] args) throws Exception {

// create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if(fileInput) {
if (fileInput) {
inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
} else {
inputStream = env.addSource(new RandomFibonacciSource());
Expand All @@ -94,10 +86,7 @@ public static void main(String[] args) throws Exception {
// 'output' channel then get the input pairs that have the greatest iteration counter
// on a 1 second sliding window
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
.map(new OutputMap())
.window(Time.of(1L, TimeUnit.SECONDS))
.every(Time.of(500L, TimeUnit.MILLISECONDS))
.maxBy(1).flatten();
.map(new OutputMap());

// emit results
if (fileOutput) {
Expand All @@ -124,12 +113,12 @@ private static class RandomFibonacciSource implements SourceFunction<Tuple2<Inte

@Override
public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
while(true) {
int first = rnd.nextInt(BOUND/2 - 1) + 1;
int second = rnd.nextInt(BOUND/2 - 1) + 1;
while (true) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;

collector.collect(new Tuple2<Integer, Integer>(first, second));
Thread.sleep(100L);
Thread.sleep(500L);
}
}

Expand All @@ -147,18 +136,19 @@ private static class FibonacciInputMap implements MapFunction<String, Tuple2<Int

@Override
public Tuple2<Integer, Integer> map(String value) throws Exception {
Thread.sleep(100L);
String record = value.substring(1, value.length()-1);
String record = value.substring(1, value.length() - 1);
String[] splitted = record.split(",");
return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
}
}

/**
* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple
* A counter is attached to the tuple and incremented in every iteration step
* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
* counter is attached to the tuple and incremented in every iteration step
*/
public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
Expand All @@ -171,12 +161,15 @@ public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, I
* Iteration step function that calculates the next Fibonacci number
*/
public static class Step implements
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
Integer> value) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
value.f3, ++value.f4);
}
}

Expand All @@ -194,20 +187,23 @@ public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Intege
} else {
output.add("output");
}
output.add("output");
return output;
}
}

/**
* Giving back the input pair and the counter
*/
public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {
public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple2<Tuple2<Integer, Integer>, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
value) throws
Exception {
return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
value.f4);
}
}

Expand All @@ -219,7 +215,6 @@ public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, In
private static boolean fileOutput = false;
private static String inputPath;
private static String outputPath;
private static final int BOUND = 100;

private static boolean parseParameters(String[] args) {

Expand All @@ -228,7 +223,7 @@ private static boolean parseParameters(String[] args) {
if (args.length == 1) {
fileOutput = true;
outputPath = args[0];
} else if(args.length == 2) {
} else if (args.length == 2) {
fileInput = true;
inputPath = args[0];
fileOutput = true;
Expand All @@ -244,4 +239,5 @@ private static boolean parseParameters(String[] args) {
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.iteration.util;

public class IterateExampleData {
public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
"(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
"(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";

public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
"((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
"((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
"((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";

private IterateExampleData() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.join.util;

public class WindowJoinData {

public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" +
"(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" +
"(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" +
"(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" +
"(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" +
"(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" +
"(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" +
"(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" +
"(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" +
"(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" +
"(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" +
"(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" +
"(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" +
"(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" +
"(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)";

public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" +
"(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" +
"(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" +
"(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" +
"(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" +
"(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" +
"(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" +
"(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" +
"(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" +
"(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" +
"(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" +
"(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" +
"(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" +
"(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" +
"(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" +
"(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" +
"(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" +
"(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" +
"(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" +
"(grace,2977)\n" + "(grace,889)\n" + "(john,1338)";

public static final String WINDOW_JOIN_RESULTS = "(bob,2,9018)\n" + "(bob,2,5472)\n" + "(bob,2,4685)\n" +
"(bob,2,710)\n" + "(bob,2,5936)\n" + "(bob,2,9896)\n" + "(bob,2,2048)\n" + "(bob,2,958)\n" +
"(bob,2,5258)\n" + "(bob,2,609)\n" + "(bob,2,1022)\n" + "(bob,2,6213)\n" + "(bob,2,3720)\n" +
"(bob,2,7612)\n" + "(bob,2,8302)\n" + "(bob,2,3968)\n" + "(bob,2,7461)";

private WindowJoinData() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.ml.util;

public class IncrementalLearningSkeletonData {

public static final String RESULTS = "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
"1\n";

private IncrementalLearningSkeletonData() {
}
}
Loading

0 comments on commit 464e782

Please sign in to comment.