Skip to content

Commit

Permalink
[FLINK-25388][table-planner] Add consumedOptions to ExecNodeMetadata
Browse files Browse the repository at this point in the history
Scanned all annotated StreamExecNodes and added the `consumedOptions` param
to their `@ExecNodeMetadata` annotation, denoting which configuration options
each ExecNode is using that might influence the topology and thus stateful
restores.

This closes apache#18624.
  • Loading branch information
matriv authored and twalthr committed Feb 14, 2022
1 parent 16ed0f4 commit 1ce16c5
Show file tree
Hide file tree
Showing 24 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand Down Expand Up @@ -80,6 +81,13 @@
*
* <p>Restore can verify whether the restored ExecNode config map contains only options of the
* given keys.
*
* <p>Common options used for all {@link StreamExecNode}s:
*
* <ul>
* <li>{@link ExecutionConfigOptions#TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED}
* <li>{@link ExecutionConfigOptions#TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM}
* </ul>
*/
String[] consumedOptions() default {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
@ExecNodeMetadata(
name = "stream-exec-changelog-normalize",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size",
},
producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
@ExecNodeMetadata(
name = "stream-exec-deduplicate",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size",
"table.exec.deduplicate.insert-update-after-sensitive-enabled",
"table.exec.deduplicate.mini-batch.compact-changes-enabled"
},
producedTransformations = StreamExecDeduplicate.DEDUPLICATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
@ExecNodeMetadata(
name = "stream-exec-global-group-aggregate",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size"
},
producedTransformations =
StreamExecGlobalGroupAggregate.GLOBAL_GROUP_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
@ExecNodeMetadata(
name = "stream-exec-global-window-aggregate",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations =
StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
@ExecNodeMetadata(
name = "stream-exec-group-aggregate",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size"
},
producedTransformations = StreamExecGroupAggregate.GROUP_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
@ExecNodeMetadata(
name = "stream-exec-group-window-aggregate",
version = 1,
consumedOptions = {
"table.local-time-zone",
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size"
},
producedTransformations = StreamExecGroupWindowAggregate.GROUP_WINDOW_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
@ExecNodeMetadata(
name = "stream-exec-incremental-group-aggregate",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size"
},
producedTransformations =
StreamExecIncrementalGroupAggregate.INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
@ExecNodeMetadata(
name = "stream-exec-limit",
version = 1,
consumedOptions = {"table.exec.rank.topn-cache-size", "table.exec.state.ttl"},
producedTransformations = StreamExecRank.RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
@ExecNodeMetadata(
name = "stream-exec-local-group-aggregate",
version = 1,
consumedOptions = {
"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size",
},
producedTransformations =
StreamExecLocalGroupAggregate.LOCAL_GROUP_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
@ExecNodeMetadata(
name = "stream-exec-local-window-aggregate",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations =
StreamExecLocalWindowAggregate.LOCAL_WINDOW_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
@ExecNodeMetadata(
name = "stream-exec-over-aggregate",
version = 1,
consumedOptions = {"table.exec.state.ttl"},
producedTransformations = StreamExecOverAggregate.OVER_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
@ExecNodeMetadata(
name = "stream-exec-python-group-window-aggregate",
version = 1,
producedTransformations = StreamExecGroupWindowAggregate.GROUP_WINDOW_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
@ExecNodeMetadata(
name = "stream-exec-rank",
version = 1,
consumedOptions = {"table.exec.state.ttl", "table.exec.rank.topn-cache-size"},
producedTransformations = StreamExecRank.RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@
@ExecNodeMetadata(
name = "stream-exec-sink",
version = 1,
consumedOptions = {
"table.exec.state.ttl",
"table.exec.sink.not-null-enforcer",
"table.exec.sink.type-length-enforcer",
"table.exec.sink.upsert-materialize",
"table.exec.sink.keyed-shuffle"
},
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@ExecNodeMetadata(
name = "stream-exec-sort-limit",
version = 1,
consumedOptions = {"table.exec.state.ttl", "table.exec.rank.topn-cache-size"},
producedTransformations = StreamExecRank.RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
@ExecNodeMetadata(
name = "stream-exec-temporal-join",
version = 1,
consumedOptions = "table.exec.state.ttl",
producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
@ExecNodeMetadata(
name = "stream-exec-watermark-assigner",
version = 1,
consumedOptions = "table.exec.source.idle-timeout",
producedTransformations = StreamExecWatermarkAssigner.WATERMARK_ASSIGNER_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
@ExecNodeMetadata(
name = "stream-exec-window-aggregate",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations = StreamExecWindowAggregate.WINDOW_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
@ExecNodeMetadata(
name = "stream-exec-window-deduplicate",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations = StreamExecWindowDeduplicate.WINDOW_DEDUPLICATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
@ExecNodeMetadata(
name = "stream-exec-window-join",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations = StreamExecWindowJoin.WINDOW_JOIN_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
@ExecNodeMetadata(
name = "stream-exec-window-rank",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations = StreamExecWindowRank.WINDOW_RANK_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@ExecNodeMetadata(
name = "stream-exec-window-table-function",
version = 1,
consumedOptions = "table.local-time-zone",
producedTransformations = CommonExecWindowTableFunction.WINDOW_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static boolean isPythonWorkerUsingManagedMemory(Configuration config) {

@SuppressWarnings("unchecked")
public static boolean isPythonWorkerInProcessMode(Configuration config) {
Class clazz = loadClass("org.apache.flink.python.PythonOptions");
Class clazz = loadClass(PYTHON_OPTIONS_CLASS);
try {
return config.getString(
(ConfigOption<String>)
Expand Down

0 comments on commit 1ce16c5

Please sign in to comment.