Skip to content

Commit

Permalink
[FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFla…
Browse files Browse the repository at this point in the history
…tMapFunction)
  • Loading branch information
aljoscha committed Nov 7, 2016
1 parent f0ef370 commit 06fb9f1
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
Expand Down Expand Up @@ -544,6 +546,45 @@ public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
}

/**
* Verify that a timely flat map call is correctly translated to an operator.
*/
@Test
public void testTimelyFlatMapTranslation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);

TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public void flatMap(
Long value,
TimerService timerService,
Collector<Integer> out) throws Exception {

}

@Override
public void onTimer(
long timestamp,
TimeDomain timeDomain,
TimerService timerService,
Collector<Integer> out) throws Exception {

}
};

DataStream<Integer> flatMapped = src
.keyBy(new IdentityKeySelector<Long>())
.flatMap(timelyFlatMapFunction);

flatMapped.addSink(new DiscardingSink<Integer>());

assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
}

@Test
public void operatorTest() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -809,6 +850,15 @@ public Long getKey(Tuple2<Long, Long> value) throws Exception {
}
}

private static class IdentityKeySelector<T> implements KeySelector<T, T> {
private static final long serialVersionUID = 1L;

@Override
public T getKey(T value) throws Exception {
return value;
}
}

public static class CustomPOJO {
private String s;
private int i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import java.lang

import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.{TimeDomain, TimerService}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
Expand Down Expand Up @@ -315,6 +317,30 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
// TODO check for custom case class
}

/**
* Verify that a timely flat map call is correctly translated to an operator.
*/
@Test
def testTimelyFlatMapTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val src = env.generateSequence(0, 0)

val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ???
override def onTimer(
timestamp: Long,
timeDomain: TimeDomain,
timerService: TimerService,
out: Collector[Int]): Unit = ???
}

val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)

assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
}

@Test def operatorTest() {
val env = StreamExecutionEnvironment.getExecutionEnvironment

Expand Down

0 comments on commit 06fb9f1

Please sign in to comment.