Skip to content

Commit

Permalink
[FLINK-2550] [streaming] Rework JoinStreams and CoGroupStreams to pro…
Browse files Browse the repository at this point in the history
…perly implement operator builder syntax
  • Loading branch information
StephanEwen committed Oct 16, 2015
1 parent c24dca5 commit 69dfc40
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.streaming.api.datastream;

import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand All @@ -36,8 +35,11 @@
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static java.util.Objects.requireNonNull;

/**
*{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
* A streaming co-group operation is evaluated over elements in a window.
Expand All @@ -64,93 +66,87 @@
* .apply(new MyCoGroupFunction());
* } </pre>
*/
public class CoGroupedStreams {
public class CoGroupedStreams<T1, T2> {

/**
* A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined.
*
* @param <T1> Type of the elements from the first input
* @param <T2> Type of the elements from the second input
*/
public static class Unspecified<T1, T2> {
DataStream<T1> input1;
DataStream<T2> input2;
/** The first input stream */
private final DataStream<T1> input1;

protected Unspecified(DataStream<T1> input1,
DataStream<T2> input2) {
this.input1 = input1;
this.input2 = input2;
}
/** The second input stream */
private final DataStream<T2> input2;

/**
* Specifies a {@link KeySelector} for elements from the first input.
*/
public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
return new WithKey<>(input1, input2, input1.clean(keySelector), null);
}

/**
* Specifies a {@link KeySelector} for elements from the second input.
*/
public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
return new WithKey<>(input1, input2, null, input1.clean(keySelector));
}
/**
* Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
*
* @param input1 The first data stream.
* @param input2 The second data stream.
*/
public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);
}

/**
* A co-group operation that has {@link KeySelector KeySelectors} defined for either both or
* one input.
*
* <p>
* You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
* and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
* {@link WindowAssigner} using {@link #window(WindowAssigner)}.
*
* @param <T1> Type of the elements from the first input
* @param <T2> Type of the elements from the second input
* @param <KEY> Type of the key. This must be the same for both inputs
* Specifies a {@link KeySelector} for elements from the first input.
*/
public static class WithKey<T1, T2, KEY> {
DataStream<T1> input1;
DataStream<T2> input2;
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return new Where<>(input1.clean(keySelector), keyType);
}

KeySelector<T1, KEY> keySelector1;
KeySelector<T2, KEY> keySelector2;
// ------------------------------------------------------------------------

/**
* CoGrouped streams that have the key for one side defined.
*
* @param <KEY> The type of the key.
*/
public class Where<KEY> {

protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
this.input1 = input1;
this.input2 = input2;
private final KeySelector<T1, KEY> keySelector1;
private final TypeInformation<KEY> keyType;

Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
}

/**
* Specifies a {@link KeySelector} for elements from the first input.
*/
public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2);
this.keyType = keyType;
}

/**
* Specifies a {@link KeySelector} for elements from the second input.
*/
public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector));
public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
if (!otherKey.equals(this.keyType)) {
throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
"first key = " + this.keyType + " , second key = " + otherKey);
}

return new EqualTo(input2.clean(keySelector));
}

// --------------------------------------------------------------------

/**
* Specifies the window on which the co-group operation works.
* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
*/
public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
if (keySelector1 == null || keySelector2 == null) {
throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
public class EqualTo {

private final KeySelector<T2, KEY> keySelector2;

EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}

/**
* Specifies the window on which the co-group operation works.
*/
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
}
return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
}
}

// ------------------------------------------------------------------------

/**
* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
* well as a {@link WindowAssigner}.
Expand All @@ -166,6 +162,8 @@ public static class WithWindow<T1, T2, KEY, W extends Window> {

private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;

private final TypeInformation<KEY> keyType;

private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

Expand All @@ -177,6 +175,7 @@ protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
Expand All @@ -185,7 +184,8 @@ protected WithWindow(DataStream<T1> input1,

this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;

this.keyType = keyType;

this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
Expand All @@ -195,7 +195,8 @@ protected WithWindow(DataStream<T1> input1,
* Sets the {@code Trigger} that should be used to trigger window emission.
*/
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, newTrigger, evictor);
}

/**
Expand All @@ -206,7 +207,8 @@ public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ?
* pre-aggregation of window results cannot be used.
*/
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, newEvictor);
}

/**
Expand Down Expand Up @@ -236,16 +238,21 @@ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformat
//clean the closure
function = input1.getExecutionEnvironment().clean(function);

UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
.returns(unionType);

WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1
.union(taggedInput2)
.keyBy(new UnionKeySelector<>(keySelector1, keySelector2))
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

// we explicitly create the keyed stream to manually pass the key type information in
WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);

if (trigger != null) {
Expand All @@ -259,13 +266,10 @@ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformat
}
}

/**
* Creates a new co-group operation from the two given inputs.
*/
public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) {
return new Unspecified<>(input1, input2);
}

// ------------------------------------------------------------------------
// Data type and type information for Tagged Union
// ------------------------------------------------------------------------

/**
* Internal class for implementing tagged union co-group.
*/
Expand Down Expand Up @@ -425,7 +429,7 @@ public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> re

@Override
public int getLength() {
return 0;
return -1;
}

@Override
Expand Down Expand Up @@ -494,6 +498,11 @@ public boolean canEqual(Object obj) {
}
}

// ------------------------------------------------------------------------
// Utility functions that implement the CoGroup logic based on the tagged
// untion window reduce
// ------------------------------------------------------------------------

private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -537,6 +546,7 @@ public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

private static final long serialVersionUID = 1L;

public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
Expand All @@ -548,8 +558,10 @@ public void apply(KEY key,
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
List<T1> oneValues = Lists.newArrayList();
List<T2> twoValues = Lists.newArrayList();

List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();

for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key){
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
return new KeyedStream<T, K>(this, clean(key));
}

Expand Down Expand Up @@ -622,16 +622,16 @@ public IterativeStream<T> iterate(long maxWaitTimeMillis) {
* Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
* and window can be specified.
*/
public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) {
return CoGroupedStreams.createCoGroup(this, otherStream);
public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
return new CoGroupedStreams<>(this, otherStream);
}

/**
* Creates a join operation. See {@link JoinedStreams} for an example of how the keys
* and window can be specified.
*/
public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) {
return JoinedStreams.createJoin(this, otherStream);
public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
return new JoinedStreams<>(this, otherStream);
}

/**
Expand Down
Loading

0 comments on commit 69dfc40

Please sign in to comment.