Skip to content

Commit

Permalink
[FLINK-28861][table] Make UID generation behavior configurable and pl…
Browse files Browse the repository at this point in the history
…an-only by default

Before this commit, due to changes for FLIP-190, every operator generated by the planner
got a UID assigned. However, the UID is based on a static counter that might return different
results depending on the environment. Thus, UIDs are not deterministic and make stateful
restores impossible e.g. when going from 1.15.0 -> 1.15.1. This PR restores the old pre-1.15
behavior for regular Table API. It only adds UIDs if the operator has been created from a
compiled plan. A compiled plan makes the UIDs static and thus deterministic.

table.exec.uid.generation=ALWAYS exists for backwards compatibility and could make stateful
upgrades possible even with invalid UIDs on best effort basis.
  • Loading branch information
twalthr committed Aug 15, 2022
1 parent cb4ead7 commit b142860
Show file tree
Hide file tree
Showing 28 changed files with 347 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@
<td>Duration</td>
<td>Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.</td>
</tr>
<tr>
<td><h5>table.exec.uid.generation</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">PLAN_ONLY</td>
<td><p>Enum</p></td>
<td>In order to remap state to operators during a restore, it is required that the pipeline's streaming transformations get a UID assigned.<br />The planner can generate and assign explicit UIDs. If no UIDs have been set by the planner, the UIDs will be auto-generated by lower layers that can take the complete topology into account for uniqueness of the IDs. See the DataStream API for more information.<br />This configuration option is for experts only and the default should be sufficient for most use cases. By default, only pipelines created from a persisted compiled plan will get UIDs assigned explicitly. Thus, these pipelines can be arbitrarily moved around within the same topology without affecting the stable UIDs.<br /><br />Possible values:<ul><li>"PLAN_ONLY": Sets UIDs on streaming transformations if and only if the pipeline definition comes from a compiled plan. Pipelines that have been constructed in the API without a compilation step will not set an explicit UID as it might not be stable across multiple translations.</li><li>"ALWAYS": Always sets UIDs on streaming transformations. This strategy is for experts only! Pipelines that have been constructed in the API without a compilation step might not be able to be restored properly. The UID generation depends on previously declared pipelines (potentially across jobs if the same JVM is used). Thus, a stable environment must be ensured. Pipeline definitions that come from a compiled plan are safe to use.</li><li>"DISABLED": No explicit UIDs will be set.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">100000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ public static <T> DataStream<PartitionCommitInfo> writer(
Configuration conf) {
StreamingFileWriter<T> fileWriter =
new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, partitionKeys, conf);
return inputStream
.transform(
StreamingFileWriter.class.getSimpleName(),
TypeInformation.of(PartitionCommitInfo.class),
fileWriter)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism);
SingleOutputStreamOperator<PartitionCommitInfo> writerStream =
inputStream
.transform(
StreamingFileWriter.class.getSimpleName(),
TypeInformation.of(PartitionCommitInfo.class),
fileWriter)
.setParallelism(parallelism);
providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
return writerStream;
}

/**
Expand All @@ -104,21 +106,24 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(

CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);

SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
SingleOutputStreamOperator<CoordinatorInput> writerStream =
inputStream
.transform(
"streaming-writer",
TypeInformation.of(CoordinatorInput.class),
writer)
.uid(providerContext.generateUid("streaming-writer").get())
.setParallelism(parallelism)
.setParallelism(parallelism);
providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);

SingleOutputStreamOperator<CoordinatorOutput> coordinatorStream =
writerStream
.transform(
"compact-coordinator",
TypeInformation.of(CoordinatorOutput.class),
coordinator)
.uid(providerContext.generateUid("compact-coordinator").get())
.setParallelism(1)
.setMaxParallelism(1);
providerContext.generateUid("compact-coordinator").ifPresent(coordinatorStream::uid);

CompactWriter.Factory<T> writerFactory =
CompactBucketWriter.factory(
Expand All @@ -128,14 +133,17 @@ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
CompactOperator<T> compacter =
new CompactOperator<>(fsSupplier, readFactory, writerFactory);

return coordinatorOp
.broadcast()
.transform(
"compact-operator",
TypeInformation.of(PartitionCommitInfo.class),
compacter)
.uid(providerContext.generateUid("compact-operator").get())
.setParallelism(parallelism);
SingleOutputStreamOperator<PartitionCommitInfo> operatorStream =
coordinatorStream
.broadcast()
.transform(
"compact-operator",
TypeInformation.of(PartitionCommitInfo.class),
compacter)
.setParallelism(parallelism);
providerContext.generateUid("compact-operator").ifPresent(operatorStream::uid);

return operatorStream;
}

/**
Expand All @@ -156,17 +164,18 @@ public static DataStreamSink<?> sink(
PartitionCommitter committer =
new PartitionCommitter(
locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
stream =
SingleOutputStreamOperator<Void> committerStream =
writer.transform(
PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
.uid(providerContext.generateUid("partition-committer").get())
.setParallelism(1)
.setMaxParallelism(1);
providerContext.generateUid("partition-committer").ifPresent(committerStream::uid);
stream = committerStream;
}

return stream.addSink(new DiscardingSink<>())
.uid(providerContext.generateUid("discarding-sink").get())
.name("end")
.setParallelism(1);
DataStreamSink<?> discardingSink =
stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
providerContext.generateUid("discarding-sink").ifPresent(discardingSink::uid);
return discardingSink;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private void testParallelismSettingTranslateAndAssert(
PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
ExecNode<?> execNode =
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
.getRootNodes()
.get(0);
Transformation<?> transformation = execNode.translateToPlan(planner);
Expand Down Expand Up @@ -550,7 +550,7 @@ public void testParallelismOnLimitPushDown() throws Exception {
PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
ExecNode<?> execNode =
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
.getRootNodes()
.get(0);
Transformation<?> transformation =
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testParallelismWithoutParallelismInfer() throws Exception {
PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
ExecNode<?> execNode =
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
.getRootNodes()
.get(0);
Transformation<?> transformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public interface ProviderContext {
* The {@code name} must be unique within the provider implementation. The framework will make
* sure that the name is unique for the entire topology.
*
* <p>This method returns empty if an identifier cannot be generated, i.e. because the job is in
* batch mode.
* <p>This method returns empty if an identifier cannot be generated, i.e., because the job is
* in batch mode, or UIDs cannot be guaranteed to be unique.
*/
Optional<String> generateUid(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,18 +488,46 @@ public class ExecutionConfigOptions {
+ "all changes to downstream just like when the mini-batch is "
+ "not enabled.");

/** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
@Deprecated
public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
key("table.exec.legacy-transformation-uids")
.booleanType()
.defaultValue(false)
.withDescription(
"In Flink 1.15 Transformation UIDs are generated deterministically starting from the metadata available after the planning phase. "
+ "This new behaviour allows a safe restore of persisted plan, remapping the plan execution graph to the correct operators state. "
+ "Setting this flag to true enables the previous \"legacy\" behavior, which is generating uids from the Transformation graph topology. "
+ "We strongly suggest to keep this flag disabled, as this flag is going to be removed in the next releases. "
+ "If you have a pipeline relying on the old behavior, please create a new pipeline and regenerate the operators state.");
"This flag has been replaced by table.exec.uid.generation. Use the enum "
+ "value DISABLED to restore legacy behavior. However, the new "
+ "default value should be sufficient for most use cases as "
+ "only pipelines from compiled plans get UIDs assigned.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
key("table.exec.uid.generation")
.enumType(UidGeneration.class)
.defaultValue(UidGeneration.PLAN_ONLY)
.withDescription(
Description.builder()
.text(
"In order to remap state to operators during a restore, "
+ "it is required that the pipeline's streaming "
+ "transformations get a UID assigned.")
.linebreak()
.text(
"The planner can generate and assign explicit UIDs. If no "
+ "UIDs have been set by the planner, the UIDs will "
+ "be auto-generated by lower layers that can take "
+ "the complete topology into account for uniqueness "
+ "of the IDs. See the DataStream API for more information.")
.linebreak()
.text(
"This configuration option is for experts only and the default "
+ "should be sufficient for most use cases. By default, "
+ "only pipelines created from a persisted compiled plan will "
+ "get UIDs assigned explicitly. Thus, these pipelines can "
+ "be arbitrarily moved around within the same topology without "
+ "affecting the stable UIDs.")
.build());

// ------------------------------------------------------------------------------------------
// Enum option types
Expand Down Expand Up @@ -622,4 +650,39 @@ public boolean isEnabled() {
return enabled;
}
}

/**
* Strategy for generating transformation UIDs for remapping state to operators during restore.
*/
@PublicEvolving
public enum UidGeneration implements DescribedEnum {
PLAN_ONLY(
text(
"Sets UIDs on streaming transformations if and only if the pipeline definition "
+ "comes from a compiled plan. Pipelines that have been constructed in "
+ "the API without a compilation step will not set an explicit UID as "
+ "it might not be stable across multiple translations.")),
ALWAYS(
text(
"Always sets UIDs on streaming transformations. This strategy is for experts only! "
+ "Pipelines that have been constructed in the API without a compilation "
+ "step might not be able to be restored properly. The UID generation "
+ "depends on previously declared pipelines (potentially across jobs "
+ "if the same JVM is used). Thus, a stable environment must be ensured. "
+ "Pipeline definitions that come from a compiled plan are safe to use.")),

DISABLED(text("No explicit UIDs will be set."));

private final InlineElement description;

UidGeneration(InlineElement description) {
this.description = description;
}

@Internal
@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,10 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {
* @param visitor ExecNodeVisitor.
*/
void accept(ExecNodeVisitor visitor);

/**
* Declares whether the node has been created as part of a plan compilation. Some translation
* properties might be impacted by this (e.g. UID generation for transformations).
*/
void setCompiled(boolean isCompiled);
}
Loading

0 comments on commit b142860

Please sign in to comment.