Skip to content

Commit

Permalink
[FLINK-26280][table-planner] Add 'table.exec.legacy-transformation-ui…
Browse files Browse the repository at this point in the history
…ds' to support old transformation uid generation behaviour

This closes apache#18879.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 2, 2022
1 parent 1bb8f9f commit c6d27ba
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,19 @@ public class ExecutionConfigOptions {
+ "all changes to downstream just like when the mini-batch is "
+ "not enabled.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
@Deprecated
public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
key("table.exec.legacy-transformation-uids")
.booleanType()
.defaultValue(false)
.withDescription(
"In Flink 1.15 Transformation UIDs are generated deterministically starting from the metadata available after the planning phase. "
+ "This new behaviour allows a safe restore of persisted plan, remapping the plan execution graph to the correct operators state. "
+ "Setting this flag to true enables the previous \"legacy\" behavior, which is generating uids from the Transformation graph topology. "
+ "We strongly suggest to keep this flag disabled, as this flag is going to be removed in the next releases. "
+ "If you have a pipeline relying on the old behavior, please create a new pipeline and regenerate the operators state.");

// ------------------------------------------------------------------------------------------
// Enum option types
// ------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ protected String createTransformationDescription(ReadableConfig config) {

protected TransformationMetadata createTransformationMeta(
String operatorName, ReadableConfig config) {
if (ExecNodeMetadataUtil.isUnsupported(this.getClass())) {
if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
|| config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
return new TransformationMetadata(
createTransformationName(config), createTransformationDescription(config));
} else {
Expand All @@ -203,7 +204,8 @@ protected TransformationMetadata createTransformationMeta(
String operatorName, String detailName, String simplifiedName, ReadableConfig config) {
final String name = createFormattedTransformationName(detailName, simplifiedName, config);
final String desc = createFormattedTransformationDescription(detailName, config);
if (ExecNodeMetadataUtil.isUnsupported(this.getClass())) {
if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
|| config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
return new TransformationMetadata(name, desc);
} else {
// Only classes supporting metadata util need to set the uid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ private Transformation<?> applySinkProvider(
inputTransform, rowtimeFieldIndex, sinkParallelism, config);
final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
return provider.consumeDataStream(createProviderContext(), dataStream)
return provider.consumeDataStream(createProviderContext(config), dataStream)
.getTransformation();
} else if (runtimeProvider instanceof TransformationSinkProvider) {
final TransformationSinkProvider provider =
Expand All @@ -460,7 +460,7 @@ public int getRowtimeIndex() {

@Override
public Optional<String> generateUid(String name) {
return createProviderContext().generateUid(name);
return createProviderContext(config).generateUid(name);
}
});
} else if (runtimeProvider instanceof SinkFunctionProvider) {
Expand Down Expand Up @@ -513,9 +513,10 @@ public Optional<String> generateUid(String name) {
}
}

private ProviderContext createProviderContext() {
private ProviderContext createProviderContext(ReadableConfig config) {
return name -> {
if (this instanceof StreamExecNode) {
if (this instanceof StreamExecNode
&& !config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
return Optional.of(createTransformationUid(name));
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
Expand Down Expand Up @@ -131,15 +133,15 @@ protected Transformation<RowData> translateToPlanInternal(
} else if (provider instanceof DataStreamScanProvider) {
Transformation<RowData> transformation =
((DataStreamScanProvider) provider)
.produceDataStream(createProviderContext(), env)
.produceDataStream(createProviderContext(config), env)
.getTransformation();
meta.fill(transformation);
transformation.setOutputType(outputTypeInfo);
return transformation;
} else if (provider instanceof TransformationScanProvider) {
final Transformation<RowData> transformation =
((TransformationScanProvider) provider)
.createTransformation(createProviderContext());
.createTransformation(createProviderContext(config));
meta.fill(transformation);
transformation.setOutputType(outputTypeInfo);
return transformation;
Expand All @@ -149,9 +151,10 @@ protected Transformation<RowData> translateToPlanInternal(
}
}

private ProviderContext createProviderContext() {
private ProviderContext createProviderContext(ReadableConfig config) {
return name -> {
if (this instanceof StreamExecNode) {
if (this instanceof StreamExecNode
&& !config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
return Optional.of(createTransformationUid(name));
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
Expand Down Expand Up @@ -225,6 +226,9 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
int rightArity,
InternalTypeInfo<RowData> returnTypeInfo,
ReadableConfig config) {
boolean shouldCreateUid =
config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);

// We filter all records instead of adding an empty source to preserve the watermarks.
FilterAllFlatMapFunction allFilter = new FilterAllFlatMapFunction(returnTypeInfo);

Expand All @@ -244,7 +248,9 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
new StreamFlatMap<>(allFilter),
returnTypeInfo,
leftParallelism);
filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION));
if (shouldCreateUid) {
filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION));
}
filterAllLeftStream.setDescription(
createFormattedTransformationDescription(
"filter all left input transformation", config));
Expand All @@ -259,7 +265,9 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
new StreamFlatMap<>(allFilter),
returnTypeInfo,
rightParallelism);
filterAllRightStream.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION));
if (shouldCreateUid) {
filterAllRightStream.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION));
}
filterAllRightStream.setDescription(
createFormattedTransformationDescription(
"filter all right input transformation", config));
Expand All @@ -274,7 +282,9 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
new StreamMap<>(leftPadder),
returnTypeInfo,
leftParallelism);
padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION));
if (shouldCreateUid) {
padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION));
}
padLeftStream.setDescription(
createFormattedTransformationDescription("pad left input transformation", config));
padLeftStream.setName(
Expand All @@ -288,7 +298,9 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
new StreamMap<>(rightPadder),
returnTypeInfo,
rightParallelism);
padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION));
if (shouldCreateUid) {
padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION));
}
padRightStream.setDescription(
createFormattedTransformationDescription("pad right input transformation", config));
padRightStream.setName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.TableConfig;
Expand All @@ -52,7 +53,6 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
Expand Down Expand Up @@ -150,7 +150,7 @@ protected Transformation<RowData> translateToPlanInternal(
final EventComparator<RowData> eventComparator =
createEventComparator(config, inputRowType);
final Transformation<RowData> timestampedInputTransform =
translateOrder(inputTransform, inputRowType);
translateOrder(inputTransform, inputRowType, config);

final Tuple2<Pattern<RowData, RowData>, List<String>> cepPatternAndNames =
translatePattern(
Expand Down Expand Up @@ -266,7 +266,7 @@ private EventComparator<RowData> createEventComparator(
}

private Transformation<RowData> translateOrder(
Transformation<RowData> inputTransform, RowType inputRowType) {
Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
int timeOrderFieldIdx = timeOrderField.getFieldIndex();
LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
Expand All @@ -277,12 +277,13 @@ private Transformation<RowData> translateOrder(
Transformation<RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
new TransformationMetadata(
createTransformationUid(TIMESTAMP_INSERTER_TRANSFORMATION),
"StreamRecordTimestampInserter",
createTransformationMeta(
TIMESTAMP_INSERTER_TRANSFORMATION,
String.format(
"StreamRecordTimestampInserter(rowtime field: %s)",
timeOrderFieldIdx)),
timeOrderFieldIdx),
"StreamRecordTimestampInserter",
config),
new StreamRecordTimestampInserter(timeOrderFieldIdx, precision),
inputTransform.getOutputType(),
inputTransform.getParallelism());
Expand Down
Loading

0 comments on commit c6d27ba

Please sign in to comment.