Skip to content

Commit

Permalink
[FLINK-2405] [streaming] Simplifies the way the partitioned field is …
Browse files Browse the repository at this point in the history
…set in StatefulFunction

Closes apache#936
  • Loading branch information
tillrohrmann authored and gyfora committed Jul 28, 2015
1 parent 042b2e8 commit 4b44e02
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
override def map(in: T): R = {
applyWithState(in, cleanFun)
}

val partitioned = isStatePartitioned
}

setStatePartitioning(mapper)

map(mapper)
}

Expand Down Expand Up @@ -552,10 +552,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
override def flatMap(in: T, out: Collector[R]): Unit = {
applyWithState(in, cleanFun) foreach out.collect
}

val partitioned = isStatePartitioned
}

setStatePartitioning(flatMapper)

flatMap(flatMapper)
}

Expand Down Expand Up @@ -601,17 +601,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
override def filter(in: T): Boolean = {
applyWithState(in, cleanFun)
}

val partitioned = isStatePartitioned
}

setStatePartitioning(filterFun)

filter(filterFun)
}

private[flink] def setStatePartitioning(fun: StatefulFunction[_, _, _]) = {
if (javaStream.isInstanceOf[KeyedDataStream[T]]) {
fun.partitionStateByKey
}
private[flink] def isStatePartitioned: Boolean = {
javaStream.isInstanceOf[KeyedDataStream[T]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ import org.apache.flink.api.common.state.OperatorState
trait StatefulFunction[I, O, S] extends RichFunction {

var state: OperatorState[Option[S]] = _
var partitioned: Boolean = false

def partitionStateByKey = { partitioned = true }
def isPartitioned = partitioned
val partitioned: Boolean

def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
val (o, s) = fun(in, state.value)
Expand All @@ -42,6 +39,6 @@ trait StatefulFunction[I, O, S] extends RichFunction {
}

override def open(c: Configuration) = {
state = getRuntimeContext().getOperatorState("state", None, isPartitioned)
state = getRuntimeContext().getOperatorState("state", None, partitioned)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ class DataStreamTest {
val statefulMap1 = src.mapWithState((in, state: Option[Long]) => (in, None))
assert(getFunctionForDataStream(statefulMap1).isInstanceOf[MapFunction[_,_]])
assert(!getFunctionForDataStream(statefulMap1).
asInstanceOf[StatefulFunction[_,_,_]].isPartitioned)
asInstanceOf[StatefulFunction[_,_,_]].partitioned)

val statefulMap2 = src.keyBy(x=>x).mapWithState(
(in, state: Option[Long]) => (in, None))
assert(getFunctionForDataStream(statefulMap2).
asInstanceOf[StatefulFunction[_,_,_]].isPartitioned)
asInstanceOf[StatefulFunction[_,_,_]].partitioned)

val flatMapFunction = new FlatMapFunction[Long, Int] {
override def flatMap(value: Long, out: Collector[Int]): Unit = {}
Expand All @@ -335,12 +335,12 @@ class DataStreamTest {
val statefulfMap1 = src.flatMapWithState((in, state: Option[Long]) => (List(in), None))
assert(getFunctionForDataStream(statefulfMap1).isInstanceOf[FlatMapFunction[_, _]])
assert(!getFunctionForDataStream(statefulfMap1).
asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
asInstanceOf[StatefulFunction[_, _, _]].partitioned)

val statefulfMap2 = src.keyBy(x=>x).flatMapWithState(
(in, state: Option[Long]) => (List(in), None))
assert(getFunctionForDataStream(statefulfMap2).
asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
asInstanceOf[StatefulFunction[_, _, _]].partitioned)

val filterFunction = new FilterFunction[Int] {
override def filter(value: Int): Boolean = false
Expand All @@ -356,12 +356,12 @@ class DataStreamTest {
val statefulFilter1 = src.filterWithState((in, state: Option[Long]) => (true, None))
assert(getFunctionForDataStream(statefulFilter1).isInstanceOf[FilterFunction[_]])
assert(!getFunctionForDataStream(statefulFilter1).
asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
asInstanceOf[StatefulFunction[_, _, _]].partitioned)

val statefulFilter2 = src.keyBy(x=>x).filterWithState(
(in, state: Option[Long]) => (false, None))
assert(getFunctionForDataStream(statefulFilter2).
asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
asInstanceOf[StatefulFunction[_, _, _]].partitioned)

try {
streamGraph.getStreamEdge(map.getId, unionFilter.getId)
Expand Down

0 comments on commit 4b44e02

Please sign in to comment.