From c0f72aa4a4ebade8f47e8de16ee0a2a6d942a797 Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Fri, 8 Jan 2021 10:02:08 -0600 Subject: [PATCH] [hotfix] add scala note to spa docs --- docs/dev/libs/state_processor_api.md | 334 +----------------------- docs/dev/libs/state_processor_api.zh.md | 332 +---------------------- 2 files changed, 9 insertions(+), 657 deletions(-) diff --git a/docs/dev/libs/state_processor_api.md b/docs/dev/libs/state_processor_api.md index fa5ef3a51ee2b..7ed30ecfabf33 100644 --- a/docs/dev/libs/state_processor_api.md +++ b/docs/dev/libs/state_processor_api.md @@ -87,20 +87,11 @@ Since the operator “Snk” does not have any state, its namespace is empty. Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application. -
-
{% highlight java %} ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend()); {% endhighlight %} -
-
-{% highlight scala %} -val bEnv = ExecutionEnvironment.getExecutionEnvironment -val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend) -{% endhighlight %} -
-
+ ### Operator State @@ -113,24 +104,12 @@ When reading operator state, users specify the operator uid, the state name, and Operator state stored in a `CheckpointedFunction` using `getListState` can be read using `ExistingSavepoint#readListState`. The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. -
-
{% highlight java %} DataSet listState = savepoint.readListState<>( "my-uid", "list-state", Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readListState( - "my-uid", - "list-state", - Types.INT) -{% endhighlight %} -
-
#### Operator Union List State @@ -138,25 +117,12 @@ Operator state stored in a `CheckpointedFunction` using `getUnionListState` can The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. -
-
{% highlight java %} DataSet listState = savepoint.readUnionState<>( "my-uid", "union-state", Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readUnionState( - "my-uid", - "union-state", - Types.INT) -{% endhighlight %} -
-
- #### Broadcast State @@ -164,8 +130,6 @@ val listState = savepoint.readUnionState( The state name and type information should match those used to define the `MapStateDescriptor` that declared this state in the DataStream application. The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. -
-
{% highlight java %} DataSet> broadcastState = savepoint.readBroadcastState<>( "my-uid", @@ -173,24 +137,11 @@ DataSet> broadcastState = savepoint.readBroadcastState< Types.INT, Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val broadcastState = savepoint.readBroadcastState( - "my-uid", - "broadcast-state", - Types.INT, - Types.INT) -{% endhighlight %} -
-
#### Using Custom Serializers Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state. -
-
{% highlight java %} DataSet listState = savepoint.readListState<>( "uid", @@ -198,17 +149,6 @@ DataSet listState = savepoint.readListState<>( Types.INT, new MyCustomIntSerializer()); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readListState( - "uid", - "list-state", - Types.INT, - new MyCustomIntSerializer) -{% endhighlight %} -
-
### Keyed State @@ -218,8 +158,6 @@ When reading a keyed state, users specify the operator id and a `KeyedStateReade The `KeyedStateReaderFunction` allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. This means if an operator contains a stateful process function such as: -
-
{% highlight java %} public class StatefulFunctionWithTime extends KeyedProcessFunction { @@ -243,36 +181,9 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction -
-{% highlight scala %} -class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] { - var state: ValueState[Int] = _ - var updateTimes: ListState[Long] = _ - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) - state = getRuntimeContext().getState(stateDescriptor) - - val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) - updateTimes = getRuntimeContext().getListState(updateDescriptor) - } - - @throws[Exception] - override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, Void]#Context, out: Collector[Void]): Unit = { - state.update(value + 1) - updateTimes.add(System.currentTimeMillis) - } -} -{% endhighlight %} -
-
Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`. -
-
{% highlight java %} DataSet keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); @@ -316,30 +227,6 @@ public class ReaderFunction extends KeyedStateReaderFunction -
-{% highlight scala %} -class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] { - var state: ValueState[Int] = _ - var updateTimes: ListState[Long] = _ - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) - state = getRuntimeContext().getState(stateDescriptor) - - val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) - updateTimes = getRuntimeContext().getListState(updateDescriptor) - } - - override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out: Collector[KeyedState]): Unit = { - val data = KeyedState(key, state.value, updateTimes.get.asScala.toList) - out.collect(data) - } -} -{% endhighlight %} -
-
Along with reading registered state values, each key has access to a `Context` with metadata such as registered event time and processing time timers. @@ -355,10 +242,7 @@ to a `WindowFunction` or `ProcessWindowFunction`. Suppose a DataStream application that counts the number of clicks per user per minute. -
-
{% highlight java %} - class Click { public String userId; @@ -398,42 +282,9 @@ clicks .addSink(new Sink()); {% endhighlight %} -
-
-{% highlight scala %} - -import java.lang.{Integer => JInteger} - -case class Click(userId: String, time: LocalDateTime) - -class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] { - - override def createAccumulator(): JInteger = 0 - - override def add(value: Click, accumulator: JInteger): JInteger = 1 + accumulator - - override def getResult(accumulator: JInteger): JInteger = accumulator - - override def merge(a: JInteger, b: JInteger): JInteger = a + b -} - -DataStream[Click] clicks = . . . - -clicks - .keyBy(click => click.userId) - .window(TumblingEventTimeWindows.of(Time.minutes(1))) - .aggregate(new ClickCounter()) - .uid("click-window") - .addSink(new Sink()) - -{% endhighlight %} -
-
This state can be read using the code below. -
-
{% highlight java %} class ClickState { @@ -470,44 +321,6 @@ savepoint .print(); {% endhighlight %} -
-
-{% highlight scala %} - -import java.lang.{Integer => JInteger, Long => JLong} -import java.util.{Set => JSet} - -case class ClickState(userId: String, count: JInteger, window: TimeWindow, triggerTimers: JSet[JLong]) - -class ClickReader extends WindowReaderFunction[JInteger, ClickState, String, TimeWindow] { - - override def readWindow( - key: String, - context: Context[TimeWindow], - elements: Iterable[JInteger], - out: Collector[ClickState]): Unit = { - - state = ClickState( - userId = key, - count = elements.iterator().next(), - window = context.window()k - triggerTimers = context.registeredEventTimeTimers()) - - out.collect(state) - } -} - -val batchEnv = ExecutionEnvironment.getExecutionEnvironment() -val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new MemoryStateBackend()) - -savepoint - .window(TumblingEventTimeWindows.of(Time.minutes(1))) - .aggregate("click-window", new ClickCounter(), new ClickReader(), Types.String, Types.INT, Types.INT) - .print() - -{% endhighlight %} -
-
Additionally, trigger state - from `CountTrigger`s or custom triggers - can be read using the method `Context#triggerState` inside the `WindowReaderFunction`. @@ -517,8 +330,10 @@ Additionally, trigger state - from `CountTrigger`s or custom triggers - can be r `Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data. Each savepoint is made up of one or more `BootstrapTransformation`'s (explained below), each of which defines the state for an individual operator. -
-
+**Note** The state processor api does not currently provide a Scala API. As a result +it will always auto-derive serializers using the Java type stack. To bootstrap +a savepoint for the Scala DataStream API please manually pass in all type information. + {% highlight java %} int maxParallelism = 128; @@ -528,19 +343,6 @@ Savepoint .withOperator("uid2", transformation2) .write(savepointPath); {% endhighlight %} -
-
-{% highlight scala %} -val maxParallelism = 128 - -Savepoint - .create(new MemoryStateBackend(), maxParallelism) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath) -{% endhighlight %} -
-
The [UIDs]({% link ops/state/savepoints.md %}#assigning-operator-ids) associated with each operator must match one to one with the UIDs assigned to the operators in your `DataStream` application; these are how Flink knows what state maps to which operator. @@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.md %}#assigning-operator-ids) associated Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`. -
-
{% highlight java %} public class SimpleBootstrapFunction extends StateBootstrapFunction { @@ -577,44 +377,11 @@ BootstrapTransformation transformation = OperatorTransformation .bootstrapWith(data) .transform(new SimpleBootstrapFunction()); {% endhighlight %} -
-
-{% highlight scala %} -class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] { - - var ListState[Integer] state = _ - - @throws[Exception] - override def processElement(value: Integer, ctx: Context): Unit = { - state.add(value) - } - - @throws[Exception] - override def snapshotState(context: FunctionSnapshotContext): Unit = { - } - - @throws[Exception] - override def initializeState(context: FunctionInitializationContext): Unit = { - state = context.getOperatorState().getListState(new ListStateDescriptor("state", Types.INT)) - } -} - -val env = ExecutionEnvironment.getExecutionEnvironment -val data = env.fromElements(1, 2, 3) - -BootstrapTransformation transformation = OperatorTransformation - .bootstrapWith(data) - .transform(new SimpleBootstrapFunction) -{% endhighlight %} -
-
### Broadcast State [BroadcastState]({% link dev/stream/state/broadcast_state.md %}) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory. -
-
{% highlight java %} public class CurrencyRate { public String currency; @@ -640,38 +407,11 @@ BootstrapTransformation broadcastTransformation = OperatorTransfor .bootstrapWith(currencyDataSet) .transform(new CurrencyBootstrapFunction()); {% endhighlight %} -
-
-{% highlight scala %} -case class CurrencyRate(currency: String, rate: Double) - -object CurrencyBootstrapFunction { - val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, Types.DOUBLE) -} - -class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction[CurrencyRate] { - - @throws[Exception] - override processElement(value: CurrencyRate, ctx: Context): Unit = { - ctx.getBroadcastState(descriptor).put(value.currency, value.rate) - } -} - -val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), CurrencyRate("EUR", 1.3)) - -val broadcastTransformation = OperatorTransformation - .bootstrapWith(currencyDataSet) - .transform(new CurrencyBootstrapFunction) -{% endhighlight %} -
-
### Keyed State Keyed state for `ProcessFunction`'s and other `RichFunction` types can be written using a `KeyedStateBootstrapFunction`. -
-
{% highlight java %} public class Account { public int id; @@ -705,37 +445,6 @@ BootstrapTransformation transformation = OperatorTransformation .keyBy(acc -> acc.id) .transform(new AccountBootstrapper()); {% endhighlight %} -
-
-{% highlight scala %} -case class Account(id: Int, amount: Double, timestamp: Long) - -class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] { - var state: ValueState[Double] - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val descriptor = new ValueStateDescriptor("total",Types.DOUBLE) - state = getRuntimeContext().getState(descriptor) - } - - @throws[Exception] - override def processElement(value: Account, ctx: Context): Unit = { - state.update(value.amount) - } -} - -val bEnv = ExecutionEnvironment.getExecutionEnvironment() - -val accountDataSet = bEnv.fromCollection(accounts) - -val transformation = OperatorTransformation - .bootstrapWith(accountDataSet) - .keyBy(acc => acc.id) - .transform(new AccountBootstrapper) -{% endhighlight %} -
-
The `KeyedStateBootstrapFunction` supports setting event time and processing time timers. The timers will not fire inside the bootstrap function and only become active once restored within a `DataStream` application. @@ -749,8 +458,6 @@ The state processor api supports writing state for the [window operator]({% link When writing window state, users specify the operator id, window assigner, evictor, optional trigger, and aggregation type. It is important the configurations on the bootstrap transformation match the configurations on the DataStream window. -
-
{% highlight java %} public class Account { public int id; @@ -773,45 +480,14 @@ BootstrapTransformation transformation = OperatorTransformation .window(TumblingEventTimeWindows.of(Time.minutes(5))) .reduce((left, right) -> left + right); {% endhighlight %} -
-
-{% highlight scala %} -case class Account(id: Int, amount: Double, timestamp: Long) - -val bEnv = ExecutionEnvironment.getExecutionEnvironment(); -val accountDataSet = bEnv.fromCollection(accounts); - -val transformation = OperatorTransformation - .bootstrapWith(accountDataSet) - // When using event time windows, its important - // to assign timestamps to each record. - .assignTimestamps(account => account.timestamp) - .keyBy(acc => acc.id) - .window(TumblingEventTimeWindows.of(Time.minutes(5))) - .reduce((left, right) => left + right) -{% endhighlight %} -
-
## Modifying Savepoints Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job. -
-
{% highlight java %} Savepoint .load(bEnv, new MemoryStateBackend(), oldPath) .withOperator("uid", transformation) .write(newPath); {% endhighlight %} -
-
-{% highlight scala %} -Savepoint - .load(bEnv, new MemoryStateBackend, oldPath) - .withOperator("uid", transformation) - .write(newPath) -{% endhighlight %} -
-
diff --git a/docs/dev/libs/state_processor_api.zh.md b/docs/dev/libs/state_processor_api.zh.md index 581d3eb675b0a..18d505a61ac8c 100644 --- a/docs/dev/libs/state_processor_api.zh.md +++ b/docs/dev/libs/state_processor_api.zh.md @@ -87,20 +87,10 @@ Since the operator “Snk” does not have any state, its namespace is empty. Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application. -
-
{% highlight java %} ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend()); {% endhighlight %} -
-
-{% highlight scala %} -val bEnv = ExecutionEnvironment.getExecutionEnvironment -val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend) -{% endhighlight %} -
-
### Operator State @@ -113,24 +103,12 @@ When reading operator state, users specify the operator uid, the state name, and Operator state stored in a `CheckpointedFunction` using `getListState` can be read using `ExistingSavepoint#readListState`. The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. -
-
{% highlight java %} DataSet listState = savepoint.readListState<>( "my-uid", "list-state", Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readListState( - "my-uid", - "list-state", - Types.INT) -{% endhighlight %} -
-
#### Operator Union List State @@ -138,25 +116,12 @@ Operator state stored in a `CheckpointedFunction` using `getUnionListState` can The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. -
-
{% highlight java %} DataSet listState = savepoint.readUnionState<>( "my-uid", "union-state", Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readUnionState( - "my-uid", - "union-state", - Types.INT) -{% endhighlight %} -
-
- #### Broadcast State @@ -164,8 +129,6 @@ val listState = savepoint.readUnionState( The state name and type information should match those used to define the `MapStateDescriptor` that declared this state in the DataStream application. The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. -
-
{% highlight java %} DataSet> broadcastState = savepoint.readBroadcastState<>( "my-uid", @@ -173,24 +136,11 @@ DataSet> broadcastState = savepoint.readBroadcastState< Types.INT, Types.INT); {% endhighlight %} -
-
-{% highlight scala %} -val broadcastState = savepoint.readBroadcastState( - "my-uid", - "broadcast-state", - Types.INT, - Types.INT) -{% endhighlight %} -
-
#### Using Custom Serializers Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state. -
-
{% highlight java %} DataSet listState = savepoint.readListState<>( "uid", @@ -198,17 +148,6 @@ DataSet listState = savepoint.readListState<>( Types.INT, new MyCustomIntSerializer()); {% endhighlight %} -
-
-{% highlight scala %} -val listState = savepoint.readListState( - "uid", - "list-state", - Types.INT, - new MyCustomIntSerializer) -{% endhighlight %} -
-
### Keyed State @@ -218,8 +157,6 @@ When reading a keyed state, users specify the operator id and a `KeyedStateReade The `KeyedStateReaderFunction` allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. This means if an operator contains a stateful process function such as: -
-
{% highlight java %} public class StatefulFunctionWithTime extends KeyedProcessFunction { @@ -243,36 +180,9 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction -
-{% highlight scala %} -class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] { - var state: ValueState[Int] = _ - var updateTimes: ListState[Long] = _ - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) - state = getRuntimeContext().getState(stateDescriptor) - - val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) - updateTimes = getRuntimeContext().getListState(updateDescriptor) - } - - @throws[Exception] - override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, Void]#Context, out: Collector[Void]): Unit = { - state.update(value + 1) - updateTimes.add(System.currentTimeMillis) - } -} -{% endhighlight %} -
-
Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`. -
-
{% highlight java %} DataSet keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); @@ -316,30 +226,6 @@ public class ReaderFunction extends KeyedStateReaderFunction -
-{% highlight scala %} -class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] { - var state: ValueState[Int] = _ - var updateTimes: ListState[Long] = _ - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int]) - state = getRuntimeContext().getState(stateDescriptor) - - val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long]) - updateTimes = getRuntimeContext().getListState(updateDescriptor) - } - - override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out: Collector[KeyedState]): Unit = { - val data = KeyedState(key, state.value, updateTimes.get.asScala.toList) - out.collect(data) - } -} -{% endhighlight %} -
-
Along with reading registered state values, each key has access to a `Context` with metadata such as registered event time and processing time timers. @@ -355,10 +241,7 @@ to a `WindowFunction` or `ProcessWindowFunction`. Suppose a DataStream application that counts the number of clicks per user per minute. -
-
{% highlight java %} - class Click { public String userId; @@ -398,42 +281,9 @@ clicks .addSink(new Sink()); {% endhighlight %} -
-
-{% highlight scala %} - -import java.lang.{Integer => JInteger} - -case class Click(userId: String, time: LocalDateTime) - -class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] { - - override def createAccumulator(): JInteger = 0 - - override def add(value: Click, accumulator: JInteger): JInteger = 1 + accumulator - - override def getResult(accumulator: JInteger): JInteger = accumulator - - override def merge(a: JInteger, b: JInteger): JInteger = a + b -} - -DataStream[Click] clicks = . . . - -clicks - .keyBy(click => click.userId) - .window(TumblingEventTimeWindows.of(Time.minutes(1))) - .aggregate(new ClickCounter()) - .uid("click-window") - .addSink(new Sink()) - -{% endhighlight %} -
-
This state can be read using the code below. -
-
{% highlight java %} class ClickState { @@ -470,44 +320,6 @@ savepoint .print(); {% endhighlight %} -
-
-{% highlight scala %} - -import java.lang.{Integer => JInteger, Long => JLong} -import java.util.{Set => JSet} - -case class ClickState(userId: String, count: JInteger, window: TimeWindow, triggerTimers: JSet[JLong]) - -class ClickReader extends WindowReaderFunction[JInteger, ClickState, String, TimeWindow] { - - override def readWindow( - key: String, - context: Context[TimeWindow], - elements: Iterable[JInteger], - out: Collector[ClickState]): Unit = { - - state = ClickState( - userId = key, - count = elements.iterator().next(), - window = context.window()k - triggerTimers = context.registeredEventTimeTimers()) - - out.collect(state) - } -} - -val batchEnv = ExecutionEnvironment.getExecutionEnvironment() -val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new MemoryStateBackend()) - -savepoint - .window(TumblingEventTimeWindows.of(Time.minutes(1))) - .aggregate("click-window", new ClickCounter(), new ClickReader(), Types.String, Types.INT, Types.INT) - .print() - -{% endhighlight %} -
-
Additionally, trigger state - from `CountTrigger`s or custom triggers - can be read using the method `Context#triggerState` inside the `WindowReaderFunction`. @@ -517,8 +329,10 @@ Additionally, trigger state - from `CountTrigger`s or custom triggers - can be r `Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data. Each savepoint is made up of one or more `BootstrapTransformation`'s (explained below), each of which defines the state for an individual operator. -
-
+**Note** The state processor api does not currently provide a Scala API. As a result +it will always auto-derive serializers using the Java type stack. To bootstrap +a savepoint for the Scala DataStream API please manually pass in all type information. + {% highlight java %} int maxParallelism = 128; @@ -528,19 +342,7 @@ Savepoint .withOperator("uid2", transformation2) .write(savepointPath); {% endhighlight %} -
-
-{% highlight scala %} -val maxParallelism = 128 -Savepoint - .create(new MemoryStateBackend(), maxParallelism) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath) -{% endhighlight %} -
-
The [UIDs]({% link ops/state/savepoints.zh.md %}#assigning-operator-ids) associated with each operator must match one to one with the UIDs assigned to the operators in your `DataStream` application; these are how Flink knows what state maps to which operator. @@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.zh.md %}#assigning-operator-ids) associa Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`. -
-
{% highlight java %} public class SimpleBootstrapFunction extends StateBootstrapFunction { @@ -577,44 +377,11 @@ BootstrapTransformation transformation = OperatorTransformation .bootstrapWith(data) .transform(new SimpleBootstrapFunction()); {% endhighlight %} -
-
-{% highlight scala %} -class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] { - - var ListState[Integer] state = _ - - @throws[Exception] - override def processElement(value: Integer, ctx: Context): Unit = { - state.add(value) - } - - @throws[Exception] - override def snapshotState(context: FunctionSnapshotContext): Unit = { - } - - @throws[Exception] - override def initializeState(context: FunctionInitializationContext): Unit = { - state = context.getOperatorState().getListState(new ListStateDescriptor("state", Types.INT)) - } -} - -val env = ExecutionEnvironment.getExecutionEnvironment -val data = env.fromElements(1, 2, 3) - -BootstrapTransformation transformation = OperatorTransformation - .bootstrapWith(data) - .transform(new SimpleBootstrapFunction) -{% endhighlight %} -
-
### Broadcast State [BroadcastState]({% link dev/stream/state/broadcast_state.zh.md %}) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory. -
-
{% highlight java %} public class CurrencyRate { public String currency; @@ -640,38 +407,11 @@ BootstrapTransformation broadcastTransformation = OperatorTransfor .bootstrapWith(currencyDataSet) .transform(new CurrencyBootstrapFunction()); {% endhighlight %} -
-
-{% highlight scala %} -case class CurrencyRate(currency: String, rate: Double) - -object CurrencyBootstrapFunction { - val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, Types.DOUBLE) -} - -class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction[CurrencyRate] { - - @throws[Exception] - override processElement(value: CurrencyRate, ctx: Context): Unit = { - ctx.getBroadcastState(descriptor).put(value.currency, value.rate) - } -} - -val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), CurrencyRate("EUR", 1.3)) - -val broadcastTransformation = OperatorTransformation - .bootstrapWith(currencyDataSet) - .transform(new CurrencyBootstrapFunction) -{% endhighlight %} -
-
### Keyed State Keyed state for `ProcessFunction`'s and other `RichFunction` types can be written using a `KeyedStateBootstrapFunction`. -
-
{% highlight java %} public class Account { public int id; @@ -705,37 +445,6 @@ BootstrapTransformation transformation = OperatorTransformation .keyBy(acc -> acc.id) .transform(new AccountBootstrapper()); {% endhighlight %} -
-
-{% highlight scala %} -case class Account(id: Int, amount: Double, timestamp: Long) - -class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] { - var state: ValueState[Double] - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - val descriptor = new ValueStateDescriptor("total",Types.DOUBLE) - state = getRuntimeContext().getState(descriptor) - } - - @throws[Exception] - override def processElement(value: Account, ctx: Context): Unit = { - state.update(value.amount) - } -} - -val bEnv = ExecutionEnvironment.getExecutionEnvironment() - -val accountDataSet = bEnv.fromCollection(accounts) - -val transformation = OperatorTransformation - .bootstrapWith(accountDataSet) - .keyBy(acc => acc.id) - .transform(new AccountBootstrapper) -{% endhighlight %} -
-
The `KeyedStateBootstrapFunction` supports setting event time and processing time timers. The timers will not fire inside the bootstrap function and only become active once restored within a `DataStream` application. @@ -749,8 +458,6 @@ The state processor api supports writing state for the [window operator]({% link When writing window state, users specify the operator id, window assigner, evictor, optional trigger, and aggregation type. It is important the configurations on the bootstrap transformation match the configurations on the DataStream window. -
-
{% highlight java %} public class Account { public int id; @@ -773,45 +480,14 @@ BootstrapTransformation transformation = OperatorTransformation .window(TumblingEventTimeWindows.of(Time.minutes(5))) .reduce((left, right) -> left + right); {% endhighlight %} -
-
-{% highlight scala %} -case class Account(id: Int, amount: Double, timestamp: Long) - -val bEnv = ExecutionEnvironment.getExecutionEnvironment(); -val accountDataSet = bEnv.fromCollection(accounts); - -val transformation = OperatorTransformation - .bootstrapWith(accountDataSet) - // When using event time windows, its important - // to assign timestamps to each record. - .assignTimestamps(account => account.timestamp) - .keyBy(acc => acc.id) - .window(TumblingEventTimeWindows.of(Time.minutes(5))) - .reduce((left, right) => left + right) -{% endhighlight %} -
-
## Modifying Savepoints Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job. -
-
{% highlight java %} Savepoint .load(bEnv, new MemoryStateBackend(), oldPath) .withOperator("uid", transformation) .write(newPath); {% endhighlight %} -
-
-{% highlight scala %} -Savepoint - .load(bEnv, new MemoryStateBackend, oldPath) - .withOperator("uid", transformation) - .write(newPath) -{% endhighlight %} -
-