Skip to content

Commit

Permalink
[streaming] Created scala GroupedDataStream
Browse files Browse the repository at this point in the history
Closes apache#860
  • Loading branch information
gyfora committed Jun 27, 2015
1 parent 3f3b17e commit b35aa20
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, F
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, GroupedDataStream,
KeyedDataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction}
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
Expand Down Expand Up @@ -241,20 +240,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: _*)

/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(firstField: String, otherFields: String*): DataStream[T] =
def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] =
javaStream.groupBy(firstField +: otherFields.toArray: _*)

/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = {

val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
Expand Down Expand Up @@ -394,111 +393,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
output
}

/**
* Applies an aggregation that that gives the current maximum of the data stream at
* the given position by the given key. An independent aggregate is kept per key.
*
*/
def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)

/**
* Applies an aggregation that that gives the current maximum of the data stream at
* the given field by the given key. An independent aggregate is kept per key.
*
*/
def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)

/**
* Applies an aggregation that that gives the current minimum of the data stream at
* the given position by the given key. An independent aggregate is kept per key.
*
*/
def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)

/**
* Applies an aggregation that that gives the current minimum of the data stream at
* the given field by the given key. An independent aggregate is kept per key.
*
*/
def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)

/**
* Applies an aggregation that sums the data stream at the given position by the given
* key. An independent aggregate is kept per key.
*
*/
def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)

/**
* Applies an aggregation that sums the data stream at the given field by the given
* key. An independent aggregate is kept per key.
*
*/
def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)

/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position by the given key. An independent aggregate is kept per key.
* When equality, the first element is returned with the minimal value.
*
*/
def minBy(position: Int): DataStream[T] = aggregate(AggregationType
.MINBY, position)

/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given field by the given key. An independent aggregate is kept per key.
* When equality, the first element is returned with the minimal value.
*
*/
def minBy(field: String): DataStream[T] = aggregate(AggregationType
.MINBY, field )

/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given position by the given key. An independent aggregate is kept per key.
* When equality, the first element is returned with the maximal value.
*
*/
def maxBy(position: Int): DataStream[T] =
aggregate(AggregationType.MAXBY, position)

/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given field by the given key. An independent aggregate is kept per key.
* When equality, the first element is returned with the maximal value.
*
*/
def maxBy(field: String): DataStream[T] =
aggregate(AggregationType.MAXBY, field)

private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
aggregate(aggregationType, position)
}

private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {

val jStream = javaStream.asInstanceOf[JavaStream[Product]]

val reducer = aggregationType match {
case AggregationType.SUM =>
new SumAggregator(position, jStream.getType, jStream.getExecutionConfig)
case _ =>
new ComparableAggregator(position, jStream.getType, aggregationType, true,
jStream.getExecutionConfig)
}

val invokable = jStream match {
case groupedStream: GroupedDataStream[Product] => new StreamGroupedReduce[Product](reducer,
groupedStream.getKeySelector())
case _ => throw new UnsupportedOperationException("Aggregations are only supported for" +
"grouped and windowed data streams.")
}
new DataStream[Product](jStream.transform("aggregation", jStream.getType(),invokable))
.asInstanceOf[DataStream[T]]
}

/**
* Creates a new DataStream by applying the given function to every element of this DataStream.
*/
Expand Down Expand Up @@ -569,74 +463,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
flatMap(flatMapper)
}

/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream
* using an associative reduce function.
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}

javaStream match {
case groupedStream: GroupedDataStream[T] => groupedStream.reduce(reducer)
case _ => throw new UnsupportedOperationException("Reduce is only supported for" +
"grouped and windowed data streams.")
}
}

/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream
* using an associative reduce function.
*/
def reduce(fun: (T, T) => T): DataStream[T] = {
if (fun == null) {
throw new NullPointerException("Reduce function must not be null.")
}
val cleanFun = clean(fun)
val reducer = new ReduceFunction[T] {
def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
}
reduce(reducer)
}

/**
* Creates a new [[DataStream]] by folding the elements of this DataStream
* using an associative fold function and an initial value.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
DataStream[R] = {
if (folder == null) {
throw new NullPointerException("Fold function must not be null.")
}

val outType : TypeInformation[R] = implicitly[TypeInformation[R]]

javaStream match {
case groupedStream: GroupedDataStream[T] => groupedStream.fold(initialValue, folder).
returns(outType).asInstanceOf[JavaStream[R]]
case _ => throw new UnsupportedOperationException("Fold is only supported for" +
"grouped and windowed data streams.")
}

}

/**
* Creates a new [[DataStream]] by folding the elements of this DataStream
* using an associative fold function and an initial value.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Fold function must not be null.")
}
val cleanFun = clean(fun)
val folder = new FoldFunction[T,R] {
def fold(acc: R, v: T) = {
cleanFun(acc, v)
}
}
fold(initialValue, folder)
}

/**
* Creates a new DataStream that contains only the elements satisfying the given filter predicate.
Expand Down
Loading

0 comments on commit b35aa20

Please sign in to comment.