Skip to content

Commit

Permalink
[FLINK-3557] [stream, scala] Introduce secondary parameter list for f…
Browse files Browse the repository at this point in the history
…old function

The fold API call takes an initial value as well as a fold function. In Scala it is possible
to provide an anonymous function. In order to easily support multi line anonymous functions
as well as being consistent with Scala's collection API, this PR adds another parameter list
to the fold API call, which contains the fold function parameter.

Insert spaces between first parameter list and curly braces of anonymous function

This closes apache#1748.
  • Loading branch information
tillrohrmann committed Mar 1, 2016
1 parent 7b8275a commit 93733cc
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* using an associative fold function and an initial value. An independent
* aggregate is kept per key.
*/
def fold[R: TypeInformation](initialValue: R, fun: (R,T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Fold function must not be null.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val windowed = connected
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
.fold((0L, 0L), func)
.fold((0L, 0L))(func)

windowed.name("testWindowFold")

Expand Down Expand Up @@ -247,7 +247,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val windowed: DataStream[(Long, Long)] = map
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
.fold((0L, 0L), (x: (Long, Long), y: (Long, Long)) => (0L, 0L))
.fold((0L, 0L))((x: (Long, Long), y: (Long, Long)) => (0L, 0L))

windowed.print()
val sink = map.addSink(x => {})
Expand Down Expand Up @@ -309,7 +309,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val flatten: DataStream[Int] = window
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
.fold(0, (accumulator: Int, value: String) => 0)
.fold(0)((accumulator: Int, value: String) => 0)
assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType())

// TODO check for custom case class
Expand Down Expand Up @@ -404,7 +404,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(foldFunction == getFunctionForDataStream(fold))
assert(
getFunctionForDataStream(map.keyBy(x=>x)
.fold("", (x: String, y: Int) => ""))
.fold("")((x: String, y: Int) => ""))
.isInstanceOf[FoldFunction[_, _]])

val connect = fold.connect(flatMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks}
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
Expand Down Expand Up @@ -70,7 +70,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
.fold(("R:", 0)) { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFoldITCase.testResults += value.toString
Expand Down Expand Up @@ -117,7 +117,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {

source1
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
.fold(("R:", 0)) { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFoldITCase.testResults += value.toString
Expand Down

0 comments on commit 93733cc

Please sign in to comment.