Skip to content

Commit

Permalink
[FLINK-26075][table-planner] Persist node configuration to JSON plan
Browse files Browse the repository at this point in the history
- Add a new JSON field to the plan called configuration which
  includes all the options used by the ExecNode along with their values.
- Use the previously introduced ExecNodeConfig class to merge the
  configuration deserialized from the JSON plan with the Planner
  configuration.
  • Loading branch information
matriv committed Mar 25, 2022
1 parent b8cdb8d commit bc11450
Show file tree
Hide file tree
Showing 191 changed files with 1,317 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {

String FIELD_NAME_ID = "id";
String FIELD_NAME_TYPE = "type";
String FIELD_NAME_CONFIGURATION = "configuration";
String FIELD_NAME_DESCRIPTION = "description";
String FIELD_NAME_INPUT_PROPERTIES = "inputProperties";
String FIELD_NAME_OUTPUT_TYPE = "outputType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ConfigurationJsonSerializerFilter;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
Expand Down Expand Up @@ -70,13 +72,30 @@ protected final ExecNodeContext getContextFromAnnotation() {
return ExecNodeContext.newContext(this.getClass()).withId(getId());
}

@JsonProperty(value = FIELD_NAME_CONFIGURATION, access = JsonProperty.Access.WRITE_ONLY)
private final ReadableConfig persistedConfig;

@JsonProperty(
value = FIELD_NAME_CONFIGURATION,
access = JsonProperty.Access.READ_ONLY,
index = 2)
// Custom filter to exclude node configuration if no consumed options are used
@JsonInclude(
value = JsonInclude.Include.CUSTOM,
valueFilter = ConfigurationJsonSerializerFilter.class)
public ReadableConfig getPersistedConfig() {
return persistedConfig;
}

protected ExecNodeBase(
int id,
ExecNodeContext context,
ReadableConfig persistedConfig,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
this.context = checkNotNull(context).withId(id);
this.persistedConfig = persistedConfig == null ? new Configuration() : persistedConfig;
this.inputProperties = checkNotNull(inputProperties);
this.outputType = checkNotNull(outputType);
this.description = checkNotNull(description);
Expand Down Expand Up @@ -129,7 +148,7 @@ public final Transformation<T> translateToPlan(Planner planner) {
translateToPlanInternal(
(PlannerBase) planner,
new ExecNodeConfig(
((PlannerBase) planner).getTableConfig(), new Configuration()));
((PlannerBase) planner).getTableConfig(), persistedConfig));
if (this instanceof SingleTransformationTranslator) {
if (inputsContainSingleton()) {
transformation.setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@

/**
* Configuration view which is used combine the {@link PlannerBase#getTableConfig()} with the {@link
* ExecNodeBase#getNodeConfig()} configuration. The persisted configuration of the {@link ExecNode}
* which is deserialized from the JSON plan has precedence over the {@link
* ExecNodeBase#getPersistedConfig()} configuration. The persisted configuration of the {@link
* ExecNode} which is deserialized from the JSON plan has precedence over the {@link
* PlannerBase#getTableConfig()}.
*/
@Internal
public final class ExecNodeConfig implements ReadableConfig {

// See https://issues.apache.org/jira/browse/FLINK-26190
// Used only for the deprecated getMaxIdleStateRetentionTime to also satisfy tests which
// manipulate maxIdleStateRetentionTime, like OverAggregateHarnessTest.
// Used only by CommonPythonUtil#getMergedConfig(StreamExecutionEnvironment, TableConfig)}
// otherwise it can be changed to ReadableConfig.
private final TableConfig tableConfig;

private final ReadableConfig nodeConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -33,6 +35,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -81,10 +84,11 @@ private ExecNodeContext(String name, Integer version) {
* @param id The unique id of the {@link ExecNode}. See {@link ExecNode#getId()}. It can be null
* initially and then later set by using {@link #withId(int)} which creates a new instance
* of {@link ExecNodeContext} since it's immutable. This way we can satisfy both the {@link
* ExecNodeBase#ExecNodeBase(int, ExecNodeContext, List, LogicalType, String)} ctor, which
* is used for the {@link JsonCreator} ctors, where the {@code id} and the {@code context}
* are read separately, and the {@link ExecNodeBase#getContextFromAnnotation()} which
* creates a new context with a new id provided by: {@link #newNodeId()}.
* ExecNodeBase#ExecNodeBase(int, ExecNodeContext, ReadableConfig, List, LogicalType,
* String)} ctor, which is used for the {@link JsonCreator} ctors, where the {@code id} and
* the {@code context} are read separately, and the {@link
* ExecNodeBase#getContextFromAnnotation()} which creates a new context with a new id
* provided by: {@link #newNodeId()}.
* @param name The name of the {@link ExecNode}. See {@link ExecNodeMetadata#name()}.
* @param version The version of the {@link ExecNode}. See {@link ExecNodeMetadata#version()}.
*/
Expand Down Expand Up @@ -176,4 +180,22 @@ public static <T extends ExecNode<?>> ExecNodeContext newContext(Class<T> execNo
}
return new ExecNodeContext(metadata.name(), metadata.version());
}

/**
* Create a configuration for the {@link ExecNode}, ready to be persisted to a JSON plan.
*
* @param execNodeClass The {@link ExecNode} class.
* @param tableConfig The planner configuration (include the {@link TableConfig}).
* @return The {@link ExecNode} configuration, which contains the consumed options for the node,
* defined by {@link ExecNodeMetadata#consumedOptions()}, along with their values.
*/
public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig(
Class<T> execNodeClass, ReadableConfig tableConfig) {
return ExecNodeMetadataUtil.newPersistedConfig(
execNodeClass,
tableConfig,
Stream.concat(
ExecNodeMetadataUtil.TABLE_CONFIG_OPTIONS.stream(),
ExecNodeMetadataUtil.EXECUTION_CONFIG_OPTIONS.stream()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class BatchExecBoundedStreamScan extends ExecNodeBase<RowData>
private final List<String> qualifiedName;

public BatchExecBoundedStreamScan(
ReadableConfig tableConfig,
DataStream<?> dataStream,
DataType sourceType,
int[] fieldIndexes,
Expand All @@ -57,6 +59,7 @@ public BatchExecBoundedStreamScan(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecBoundedStreamScan.class),
ExecNodeContext.newPersistedConfig(BatchExecBoundedStreamScan.class, tableConfig),
Collections.emptyList(),
outputType,
description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
Expand All @@ -37,6 +38,7 @@
public class BatchExecCalc extends CommonExecCalc implements BatchExecNode<RowData> {

public BatchExecCalc(
ReadableConfig tableConfig,
List<RexNode> projection,
@Nullable RexNode condition,
InputProperty inputProperty,
Expand All @@ -45,6 +47,7 @@ public BatchExecCalc(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecCalc.class),
ExecNodeContext.newPersistedConfig(BatchExecCalc.class, tableConfig),
projection,
condition,
TableStreamOperator.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
Expand All @@ -37,6 +38,7 @@
public class BatchExecCorrelate extends CommonExecCorrelate implements BatchExecNode<RowData> {

public BatchExecCorrelate(
ReadableConfig tableConfig,
FlinkJoinType joinType,
RexCall invocation,
@Nullable RexNode condition,
Expand All @@ -46,6 +48,7 @@ public BatchExecCorrelate(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecCorrelate.class),
ExecNodeContext.newPersistedConfig(BatchExecCorrelate.class, tableConfig),
joinType,
invocation,
condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
Expand Down Expand Up @@ -66,10 +67,15 @@ public class BatchExecExchange extends CommonExecExchange implements BatchExecNo
// if it's None, use value from configuration
@Nullable private StreamExchangeMode requiredExchangeMode;

public BatchExecExchange(InputProperty inputProperty, RowType outputType, String description) {
public BatchExecExchange(
ReadableConfig tableConfig,
InputProperty inputProperty,
RowType outputType,
String description) {
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecExchange.class),
ExecNodeContext.newPersistedConfig(BatchExecExchange.class, tableConfig),
Collections.singletonList(inputProperty),
outputType,
description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
Expand All @@ -34,13 +35,15 @@
public class BatchExecExpand extends CommonExecExpand implements BatchExecNode<RowData> {

public BatchExecExpand(
ReadableConfig tableConfig,
List<List<RexNode>> projects,
InputProperty inputProperty,
RowType outputType,
String description) {
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecExpand.class),
ExecNodeContext.newPersistedConfig(BatchExecExpand.class, tableConfig),
projects,
false, // retainHeader
Collections.singletonList(inputProperty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class BatchExecHashAggregate extends ExecNodeBase<RowData>
private final boolean isFinal;

public BatchExecHashAggregate(
ReadableConfig tableConfig,
int[] grouping,
int[] auxGrouping,
AggregateCall[] aggCalls,
Expand All @@ -71,6 +73,7 @@ public BatchExecHashAggregate(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecHashAggregate.class),
ExecNodeContext.newPersistedConfig(BatchExecHashAggregate.class, tableConfig),
Collections.singletonList(inputProperty),
outputType,
description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
private final boolean tryDistinctBuildRow;

public BatchExecHashJoin(
ReadableConfig tableConfig,
JoinSpec joinSpec,
int estimatedLeftAvgRowSize,
int estimatedRightAvgRowSize,
Expand All @@ -75,6 +77,7 @@ public BatchExecHashJoin(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecHashJoin.class),
ExecNodeContext.newPersistedConfig(BatchExecHashJoin.class, tableConfig),
Arrays.asList(leftInputProperty, rightInputProperty),
outputType,
description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class BatchExecHashWindowAggregate extends ExecNodeBase<RowData>
private final boolean isFinal;

public BatchExecHashWindowAggregate(
ReadableConfig tableConfig,
int[] grouping,
int[] auxGrouping,
AggregateCall[] aggCalls,
Expand All @@ -84,6 +86,7 @@ public BatchExecHashWindowAggregate(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecHashWindowAggregate.class),
ExecNodeContext.newPersistedConfig(BatchExecHashWindowAggregate.class, tableConfig),
Collections.singletonList(inputProperty),
outputType,
description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
Expand All @@ -41,6 +42,7 @@
public class BatchExecLegacySink<T> extends CommonExecLegacySink<T> implements BatchExecNode<T> {

public BatchExecLegacySink(
ReadableConfig tableConfig,
TableSink<T> tableSink,
@Nullable String[] upsertKeys,
InputProperty inputProperty,
Expand All @@ -49,6 +51,7 @@ public BatchExecLegacySink(
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecLegacySink.class),
ExecNodeContext.newPersistedConfig(BatchExecLegacySink.class, tableConfig),
tableSink,
upsertKeys,
false, // needRetraction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
Expand Down Expand Up @@ -55,13 +56,16 @@ public class BatchExecLegacyTableSourceScan extends CommonExecLegacyTableSourceS
implements BatchExecNode<RowData> {

public BatchExecLegacyTableSourceScan(
ReadableConfig tableConfig,
TableSource<?> tableSource,
List<String> qualifiedName,
RowType outputType,
String description) {
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecLegacyTableSourceScan.class),
ExecNodeContext.newPersistedConfig(
BatchExecLegacyTableSourceScan.class, tableConfig),
tableSource,
qualifiedName,
outputType,
Expand Down
Loading

0 comments on commit bc11450

Please sign in to comment.