Skip to content

Commit

Permalink
[streaming] CoFlatMap & CoGroupReduce functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
szape authored and StephanEwen committed Aug 29, 2014
1 parent 1e38a63 commit 7596012
Show file tree
Hide file tree
Showing 10 changed files with 684 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;

/**
* The ConnectedDataStream represents a stream for two different data types. It can be
* used to apply transformations like {@link CoMapFunction} on two
* The ConnectedDataStream represents a stream for two different data types. It
* can be used to apply transformations like {@link CoMapFunction} on two
* {@link DataStream}s
*
* @param <IN1>
Expand All @@ -50,8 +57,8 @@ public class ConnectedDataStream<IN1, IN2> {
DataStream<IN1> input1;
DataStream<IN2> input2;

protected ConnectedDataStream(StreamExecutionEnvironment environment, JobGraphBuilder jobGraphBuilder,
DataStream<IN1> input1, DataStream<IN2> input2) {
protected ConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2) {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
Expand All @@ -76,6 +83,46 @@ public DataStream<IN2> getSecond() {
return input2.copy();
}

/**
* Sets the partitioning of the two separate {@link DataStream}s so that the
* output tuples are partitioned by their hashcode and are sent to only one
* component.
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return The DataStream with field partitioning set.
*/
public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
if (keyPosition1 < 0 || keyPosition2 < 0) {
throw new IllegalArgumentException("The position of the field must be non-negative");
}

return new ConnectedDataStream<IN1, IN2>(this.environment, this.jobGraphBuilder, getFirst()
.setConnectionType(new FieldsPartitioner<IN1>(keyPosition1)), getSecond()
.setConnectionType(new FieldsPartitioner<IN2>(keyPosition2)));
}

/**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Basically
* used before Reduce operation.
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return
*/
public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
return this.partitionBy(keyPosition1, keyPosition2);
}

/**
* Applies a CoMap transformation on two separate {@link DataStream}s. The
* transformation calls a {@link CoMapFunction#map1} for each element of the
Expand All @@ -94,6 +141,54 @@ public DataStream<IN2> getSecond() {
CoMapFunction.class, 0, 1, 2), new CoMapInvokable<IN1, IN2, OUT>(coMapper));
}

/**
* Applies a CoFlatMap transformation on two separate {@link DataStream}s.
* The transformation calls a {@link CoFlatMapFunction#map1} for each
* element of the first input and {@link CoFlatMapFunction#map2} for each
* element of the second input. Each CoFlatMapFunction call returns any
* number of elements including none. The user can also extend
* {@link RichFlatMapFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
return addCoFunction("coFlatMap", coFlatMapper, new FunctionTypeWrapper<IN1, IN2, OUT>(
coFlatMapper, CoFlatMapFunction.class, 0, 1, 2),
new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}

/**
* Applies a CoReduce transformation on the grouped data stream grouped on
* by the given key position. The {@link CoReduceFunction} will receive
* input values based on the key positions. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input. For
* each input, only values with the same key will go to the same reducer.
* The user can also extend {@link RichReduceFunction} to gain access to
* other features provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every two
* element with the same key of each input DataStream.
* @param keyPosition1
* position of the key in the first input DataStream
* @param keyPosition2
* position of the key in the second input DataStream
* @return The transformed DataStream.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(
CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1, int keyPosition2) {
return addCoFunction("coReduce", coReducer, new FunctionTypeWrapper<IN1, IN2, OUT>(
coReducer, CoReduceFunction.class, 0, 1, 2),
new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}

protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeSerializerWrapper<IN1, IN2, OUT> typeWrapper,
CoInvokable<IN1, IN2, OUT> functionInvokable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.co;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.util.Collector;

/**
* A CoFlatMapFunction represents a FlatMap transformation with two different
* input types.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
*/
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {

public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.co;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

/**
* The CoReduceFunction interface represents a Reduce transformation with two
* different input streams. The reduce1 function combine groups of elements of
* the first input with the same key to a single value, while reduce2 combine
* groups of elements of the second input with the same key to a single value.
* Each produced values are mapped to the same type by map1 and map2,
* respectively, to form one output stream.
*
* The basic syntax for using a grouped ReduceFunction is as follows:
*
* <pre>
* <blockquote>
* ConnectedDataStream<X> input = ...;
*
* ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
* .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
* </blockquote>
* </pre>
* <p>
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
*/
public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {

/**
* The core method of CoReduceFunction, combining two values of the first
* input into one value of the same type. The reduce1 function is
* consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1
* The first value to combine.
* @param value2
* The second value to combine.
* @return The combined value of both input values.
*
* @throws Exception
* This method may throw exceptions. Throwing an exception will
* cause the operation to fail and may trigger recovery.
*/
public abstract IN1 reduce1(IN1 value1, IN1 value2);

/**
* The core method of ReduceFunction, combining two values of the second
* input into one value of the same type. The reduce2 function is
* consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1
* The first value to combine.
* @param value2
* The second value to combine.
* @return The combined value of both input values.
*
* @throws Exception
* This method may throw exceptions. Throwing an exception will
* cause the operation to fail and may trigger recovery.
*/
public abstract IN2 reduce2(IN2 value1, IN2 value2);

/**
* Maps the reduced first input to the output type.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
*/
public abstract OUT map1(IN1 value);

/**
* Maps the reduced second input to the output type.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
*/
public abstract OUT map2(IN2 value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;

public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;

private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;

public CoFlatMapInvokable(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
this.flatMapper = flatMapper;
}

@Override
public void handleStream1() throws Exception {
flatMapper.flatMap1(reuse1.getObject(), collector);
}

@Override
public void handleStream2() throws Exception {
flatMapper.flatMap2(reuse2.getObject(), collector);
}

@Override
public void open(Configuration parameters) throws Exception {
if (flatMapper instanceof RichFunction) {
((RichFunction) flatMapper).open(parameters);
}
}

@Override
public void close() throws Exception {
if (flatMapper instanceof RichFunction) {
((RichFunction) flatMapper).close();
}
}

}
Loading

0 comments on commit 7596012

Please sign in to comment.