Skip to content

Commit

Permalink
[streaming] Added TypeInfo to DataStream
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Sep 20, 2014
1 parent 0c8f1da commit 4d73f51
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.types.TypeInformation;

/**
* The ConnectedDataStream represents a stream for two different data types. It
Expand All @@ -58,7 +59,7 @@ protected ConnectedDataStream(StreamExecutionEnvironment environment,
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
this.input2 = input2.copy();
this.input2 = input2.copy();
}

/**
Expand All @@ -79,6 +80,22 @@ public DataStream<IN2> getSecond() {
return input2.copy();
}

/**
* Gets the type of the first input
* @return The type of the first input
*/
public TypeInformation<IN1> getInputType1() {
return input1.getOutputType();
}

/**
* Gets the type of the second input
* @return The type of the second input
*/
public TypeInformation<IN2> getInputType2() {
return input2.getOutputType();
}

/**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Used for
Expand Down Expand Up @@ -189,7 +206,7 @@ public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPos

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName);
environment, functionName, outTypeWrapper);

try {
input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public abstract class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
protected TypeSerializerWrapper<OUT> outTypeWrapper;

protected final JobGraphBuilder jobGraphBuilder;

Expand All @@ -100,8 +101,11 @@ public abstract class DataStream<OUT> {
* StreamExecutionEnvironment
* @param operatorType
* The type of the operator in the component
* @param outTypeWrapper
* Type of the output
*/
public DataStream(StreamExecutionEnvironment environment, String operatorType) {
public DataStream(StreamExecutionEnvironment environment, String operatorType,
TypeSerializerWrapper<OUT> outTypeWrapper) {
if (environment == null) {
throw new NullPointerException("context is null");
}
Expand All @@ -114,7 +118,7 @@ public DataStream(StreamExecutionEnvironment environment, String operatorType) {
this.userDefinedNames = new ArrayList<String>();
this.selectAll = false;
this.partitioner = new ForwardPartitioner<OUT>();

this.outTypeWrapper = outTypeWrapper;
}

/**
Expand All @@ -131,7 +135,7 @@ public DataStream(DataStream<OUT> dataStream) {
this.selectAll = dataStream.selectAll;
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;

this.outTypeWrapper = dataStream.outTypeWrapper;
}

/**
Expand Down Expand Up @@ -159,6 +163,15 @@ public int getParallelism() {
return this.degreeOfParallelism;
}

/**
* Gets the output type.
*
* @return The output type.
*/
public TypeInformation<OUT> getOutputType() {
return this.outTypeWrapper.getTypeInfo();
}

/**
* Creates a new {@link MergedDataStream} by merging {@link DataStream}
* outputs of the same type with each other. The DataStreams merged using
Expand Down Expand Up @@ -890,7 +903,7 @@ public IterativeDataStream<OUT> iterate() {

protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {

DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);

jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
Expand Down Expand Up @@ -919,7 +932,7 @@ protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTi
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
functionName);
functionName, outTypeWrapper);

try {
jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
Expand Down Expand Up @@ -1001,7 +1014,7 @@ private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OU

private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", outTypeWrapper);

try {
jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;

/**
* Represents the end of a DataStream.
Expand All @@ -27,8 +28,8 @@
*/
public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {

protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
super(environment, operatorType);
protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<IN> outTypeWrapper) {
super(environment, operatorType, outTypeWrapper);
}

protected DataStreamSink(DataStream<IN> dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;

/**
* The DataStreamSource represents the starting point of a DataStream.
Expand All @@ -27,8 +28,8 @@
*/
public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {

public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) {
super(environment, operatorType);
public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
super(environment, operatorType, outTypeWrapper);
}

public DataStreamSource(DataStream<OUT> dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
import org.apache.flink.streaming.api.invokable.util.Timestamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;

//import org.apache.jasper.compiler.Node.ParamsAction;
import org.apache.flink.types.TypeInformation;

/**
* A GroupedDataStream represents a data stream which has been partitioned by
Expand All @@ -53,12 +52,21 @@ protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
}

/**
* Applies a reduce transformation on the grouped data stream grouped by the
* given key position. The {@link ReduceFunction} will receive input values
* based on the key value. Only input values with the same key will go to
* the same reducer.The user can also extend {@link RichReduceFunction} to
* gain access to other features provided by the {@link RichFuntion}
* interface.
* Gets the output type.
*
* @return The output type.
*/
public TypeInformation<OUT> getOutputType() {
return dataStream.getOutputType();
}

/**
* Applies a reduce transformation on the grouped data stream grouped on by
* the given key position. The {@link ReduceFunction} will receive input
* values based on the key value. Only input values with the same key will
* go to the same reducer.The user can also extend
* {@link RichReduceFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
Expand All @@ -70,7 +78,7 @@ protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
}

/**
* Applies a group reduce transformation on preset chunks of the grouped
* data stream. The {@link GroupReduceFunction} will receive input values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public DataStream<IN> closeWith(DataStream<IN> iterationResult) {
*
*/
public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);

jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
iterationID.toString(), iterationTail.getParallelism(), waitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;

/**
* The MergedDataStream represents a DataStream which consists of merged outputs
Expand All @@ -35,8 +36,9 @@ public class MergedDataStream<OUT> extends DataStream<OUT> {

protected List<DataStream<OUT>> mergedStreams;

protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType) {
super(environment, operatorType);
protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType,
TypeSerializerWrapper<OUT> outTypeWrapper) {
super(environment, operatorType, outTypeWrapper);
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
}
Expand Down
Loading

0 comments on commit 4d73f51

Please sign in to comment.