Skip to content

Commit

Permalink
[FLINK-1824] [streaming] Support added for missing types in DataStrea…
Browse files Browse the repository at this point in the history
…m api

Closes apache#567
  • Loading branch information
gyfora authored and mbalassi committed Apr 5, 2015
1 parent 909cee2 commit 1cf49e9
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function
}

@SuppressWarnings("unchecked")
private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
String functionName, boolean allowMissing)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
Expand Down Expand Up @@ -169,7 +170,9 @@ public <IN, OUT> void addStreamVertex(Integer vertexID,

StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
inTypeInfo, executionConfig) : null;
StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(

StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
outTypeInfo, executionConfig) : null;

addTypeSerializers(vertexID, inSerializer, null, outSerializer, null);
Expand Down Expand Up @@ -215,7 +218,8 @@ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer it
setSerializersFrom(iterationHead, vertexID);

int outpartitionerIndexToCopy = edges.getInEdgeIndices(iterationHead).get(0);
StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy).get(0).getPartitioner();
StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy)
.get(0).getPartitioner();

setEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());

Expand Down Expand Up @@ -273,9 +277,12 @@ public <IN1, IN2, OUT> void addCoTask(Integer vertexID,

addVertex(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);

StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
outTypeInfo, executionConfig) : null;

addTypeSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig),
new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null);
new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer, null);

if (LOG.isDebugEnabled()) {
LOG.debug("CO-TASK: {}", vertexID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.StreamGraph;
Expand Down Expand Up @@ -241,8 +242,10 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
* @return The transformed {@link DataStream}
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
coMapper.getClass(), 2, null, null);

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
CoMapFunction.class, false, true, getInputType1(), getInputType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
clean(coMapper)));
Expand All @@ -266,8 +269,10 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
coFlatMapper.getClass(), 2, null, null);

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
clean(coFlatMapper)));
Expand All @@ -291,8 +296,9 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
coReducer.getClass(), 2, null, null);
TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer)));

Expand Down Expand Up @@ -357,9 +363,10 @@ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}

TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
coWindowFunction.getClass(), 2, null, null);

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
Utils.getCallLocationName(), true);

return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
Expand Down Expand Up @@ -402,9 +409,8 @@ protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, functionInvokable);

dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable,
getInputType1(), getInputType2(), outTypeInfo, functionName,
environment.getParallelism());
dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable, getInputType1(),
getInputType2(), outTypeInfo, functionName, environment.getParallelism());

dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
Expand All @@ -38,11 +39,13 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
Expand Down Expand Up @@ -102,7 +105,6 @@ public class DataStream<OUT> {
protected static Integer counter = 0;
protected final StreamExecutionEnvironment environment;
protected final Integer id;
protected final String type;
protected int parallelism;
protected List<String> userDefinedNames;
protected StreamPartitioner<OUT> partitioner;
Expand All @@ -111,6 +113,7 @@ public class DataStream<OUT> {
protected List<DataStream<OUT>> mergedStreams;

protected final StreamGraph streamGraph;
private boolean typeUsed;

/**
* Create a new {@link DataStream} in the given execution environment with
Expand All @@ -131,7 +134,6 @@ public DataStream(StreamExecutionEnvironment environment, String operatorType,

counter++;
this.id = counter;
this.type = operatorType;
this.environment = environment;
this.parallelism = environment.getParallelism();
this.streamGraph = environment.getStreamGraph();
Expand All @@ -151,7 +153,6 @@ public DataStream(StreamExecutionEnvironment environment, String operatorType,
public DataStream(DataStream<OUT> dataStream) {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.type = dataStream.type;
this.parallelism = dataStream.parallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner;
Expand Down Expand Up @@ -192,9 +193,45 @@ public int getParallelism() {
*/
@SuppressWarnings("unchecked")
public TypeInformation<OUT> getType() {
if (typeInfo instanceof MissingTypeInfo) {
MissingTypeInfo typeInfo = (MissingTypeInfo) this.typeInfo;
throw new InvalidTypesException(
"The return type of function '"
+ typeInfo.getFunctionName()
+ "' could not be determined automatically, due to type erasure. "
+ "You can give type information hints by using the returns(...) method on the result of "
+ "the transformation call, or by letting your function implement the 'ResultTypeQueryable' "
+ "interface.", typeInfo.getTypeException());
}
typeUsed = true;
return this.typeInfo;
}

/**
* Tries to fill in the type information. Type information can be filled in
* later when the program uses a type hint. This method checks whether the
* type information has ever been accessed before and does not allow
* modifications if the type was accessed already. This ensures consistency
* by making sure different parts of the operation do not assume different
* type information.
*
* @param typeInfo
* The type information to fill in.
*
* @throws IllegalStateException
* Thrown, if the type information has been accessed before.
*/
protected void fillInType(TypeInformation<OUT> typeInfo) {
if (typeUsed) {
throw new IllegalStateException(
"TypeInformation cannot be filled in for the type after it has been used. "
+ "Please make sure that the type info hints are the first call after the transformation function, "
+ "before any access to types or semantic properties, etc.");
}
streamGraph.setOutType(id, typeInfo);
this.typeInfo = typeInfo;
}

public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
Expand Down Expand Up @@ -234,8 +271,8 @@ public DataStream<OUT> merge(DataStream<OUT>... streams) {

/**
* Operator used for directing tuples to specific named outputs using an
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling
* this method on an operator creates a new {@link SplitDataStream}.
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
* Calling this method on an operator creates a new {@link SplitDataStream}.
*
* @param outputSelector
* The user defined
Expand Down Expand Up @@ -471,7 +508,8 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
*/
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {

TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);

return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper)));
}
Expand All @@ -495,7 +533,7 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {

TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType());
getType(), Utils.getCallLocationName(), true);

return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));

Expand All @@ -521,20 +559,22 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {

/**
* Applies a fold transformation on the data stream. The returned stream
* contains all the intermediate values of the fold transformation. The
* user can also extend the {@link RichFoldFunction} to gain access to
* other features provided by the {@link org.apache.flink.api.common.functions.RichFunction}
* interface
*
* contains all the intermediate values of the fold transformation. The user
* can also extend the {@link RichFoldFunction} to gain access to other
* features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface
*
* @param folder
* The {@link FoldFunction} that will be called for every element
* of the input values.
* The {@link FoldFunction} that will be called for every element
* of the input values.
* @return The transformed DataStream
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), false);

return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));
return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder),
initialValue, outType));
}

/**
Expand Down Expand Up @@ -1111,15 +1151,19 @@ public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode w
}

/**
* Writes the DataStream to a socket as a byte array. The format of the output is
* specified by a {@link SerializationSchema}.
*
* @param hostName host of the socket
* @param port port of the socket
* @param schema schema for serialization
* Writes the DataStream to a socket as a byte array. The format of the
* output is specified by a {@link SerializationSchema}.
*
* @param hostName
* host of the socket
* @param port
* port of the socket
* @param schema
* schema for serialization
* @return the closed DataStream
*/
public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
public DataStreamSink<OUT> writeToSocket(String hostName, int port,
SerializationSchema<OUT, byte[]> schema) {
DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
return returnStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
Expand Down Expand Up @@ -70,21 +71,21 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
*/
@Override
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
keySelector));
return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(
clean(reducer), keySelector));
}

/**
* Applies a fold transformation on the grouped data stream grouped on by
* the given key position. The {@link FoldFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same folder.The user can also extend
* {@link RichFoldFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* go to the same folder.The user can also extend {@link RichFoldFunction}
* to gain access to other features provided by the {@link RichFuntion}
* interface.
*
* @param folder
* The {@link FoldFunction} that will be called for every
* element of the input values with the same key.
* The {@link FoldFunction} that will be called for every element
* of the input values with the same key.
* @param initialValue
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
Expand All @@ -93,10 +94,11 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
@Override
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {

TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), false);

return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder), keySelector,
initialValue, outType));
return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder),
keySelector, initialValue, outType));
}

/**
Expand Down Expand Up @@ -215,8 +217,8 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);

SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", getType(),
invokable);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
getType(), invokable);

return returnStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ public O returns(TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
throw new IllegalArgumentException("Type information must not be null.");
}
streamGraph.setOutType(id, typeInfo);
this.typeInfo = typeInfo;
fillInType(typeInfo);
@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
Expand Down
Loading

0 comments on commit 1cf49e9

Please sign in to comment.