Skip to content

Commit

Permalink
[FLINK-36273][table] Remove all deprecated Table/SQL configuration in…
Browse files Browse the repository at this point in the history
… 2.0 in flink-table-api module
  • Loading branch information
xuyangzhong authored and lincoln-lil committed Sep 25, 2024
1 parent 2ca6c2f commit 499df42
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 184 deletions.
6 changes: 6 additions & 0 deletions flink-end-to-end-tests/flink-tpcds-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ under the License.
<name>Flink : E2E Tests : TPCDS</name>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
Expand Down Expand Up @@ -138,11 +136,6 @@ private static TableEnvironment prepareTableEnv(String sourceTablePath, Boolean
TableEnvironment tEnv = TableEnvironment.create(environmentSettings);

// config Optimizer parameters
// TODO use the default shuffle mode of batch runtime mode once FLINK-23470 is implemented
tEnv.getConfig()
.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;

import java.time.Duration;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/**
Expand Down Expand Up @@ -499,43 +497,6 @@ public class ExecutionConfigOptions {
.withDescription(
"If true, multiple physical operators will be compiled into a single operator by planner which can improve the performance.");

/** @deprecated Use {@link ExecutionOptions#BATCH_SHUFFLE_MODE} instead. */
@Deprecated
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
public static final ConfigOption<String> TABLE_EXEC_SHUFFLE_MODE =
key("table.exec.shuffle-mode")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text("Sets exec shuffle mode.")
.linebreak()
.text("Accepted values are:")
.list(
text(
"%s: All edges will use blocking shuffle.",
code("ALL_EDGES_BLOCKING")),
text(
"%s: Forward edges will use pipelined shuffle, others blocking.",
code("FORWARD_EDGES_PIPELINED")),
text(
"%s: Pointwise edges will use pipelined shuffle, others blocking. "
+ "Pointwise edges include forward and rescale edges.",
code("POINTWISE_EDGES_PIPELINED")),
text(
"%s: All edges will use pipelined shuffle.",
code("ALL_EDGES_PIPELINED")),
text(
"%s: the same as %s. Deprecated.",
code("batch"), code("ALL_EDGES_BLOCKING")),
text(
"%s: the same as %s. Deprecated.",
code("pipelined"), code("ALL_EDGES_PIPELINED")))
.text(
"Note: Blocking shuffle means data will be fully produced before sent to consumer tasks. "
+ "Pipelined shuffle means data will be sent to consumer tasks once produced.")
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR =
key("table.exec.legacy-cast-behaviour")
Expand All @@ -550,7 +511,6 @@ public class ExecutionConfigOptions {
ConfigOptions.key("table.exec.rank.topn-cache-size")
.longType()
.defaultValue(10000L)
.withDeprecatedKeys("table.exec.topn-cache-size")
.withDescription(
"Rank operators have a cache which caches partial state contents "
+ "to reduce state access. Cache size is the number of records "
Expand All @@ -570,8 +530,6 @@ public class ExecutionConfigOptions {
key("table.exec.deduplicate.insert-update-after-sensitive-enabled")
.booleanType()
.defaultValue(true)
.withDeprecatedKeys(
"table.exec.deduplicate.insert-and-updateafter-sensitive.enabled")
.withDescription(
"Set whether the job (especially the sinks) is sensitive to "
+ "INSERT messages and UPDATE_AFTER messages. "
Expand All @@ -587,8 +545,6 @@ public class ExecutionConfigOptions {
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes-enabled")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys(
"table.exec.deduplicate.mini-batch.compact-changes.enabled")
.withDescription(
"Set whether to compact the changes sent downstream in row-time "
+ "mini-batch. If true, Flink will compact changes and send "
Expand All @@ -598,19 +554,6 @@ public class ExecutionConfigOptions {
+ "all changes to downstream just like when the mini-batch is "
+ "not enabled.");

/** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
@Deprecated
public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
key("table.exec.legacy-transformation-uids")
.booleanType()
.defaultValue(false)
.withDescription(
"This flag has been replaced by table.exec.uid.generation. Use the enum "
+ "value DISABLED to restore legacy behavior. However, the new "
+ "default value should be sufficient for most use cases as "
+ "only pipelines from compiled plans get UIDs assigned.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
key("table.exec.uid.generation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ public class OptimizerConfigOptions {
key("table.optimizer.sql2rel.project-merge.enabled")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys("table.optimizer.sql-to-rel.project.merge.enabled")
.withDescription(
Description.builder()
.text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public boolean shouldSetUid() {
final UidGeneration uidGeneration = get(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION);
switch (uidGeneration) {
case PLAN_ONLY:
return isCompiled
&& !get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
return isCompiled;
case ALWAYS:
return true;
case DISABLED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ private Transformation<RowData> createNegativeWindowSizeJoin(
int rightArity,
InternalTypeInfo<RowData> returnTypeInfo,
ExecNodeConfig config) {
boolean shouldCreateUid =
config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
boolean shouldCreateUid = config.isCompiled();

// We filter all records instead of adding an empty source to preserve the watermarks.
FilterAllFlatMapFunction allFilter = new FilterAllFlatMapFunction(returnTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.api.config.ExecutionConfigOptions;

import java.util.Optional;

/** Utility class to load job-wide exchange mode. */
@Internal
Expand All @@ -43,12 +39,6 @@ public static StreamExchangeMode getBatchStreamExchangeMode(
return StreamExchangeMode.BATCH;
}

final GlobalStreamExchangeMode globalExchangeMode =
getGlobalStreamExchangeMode(config).orElse(null);
if (globalExchangeMode == GlobalStreamExchangeMode.ALL_EDGES_BLOCKING) {
return StreamExchangeMode.BATCH;
}

final BatchShuffleMode shuffleMode = config.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
return StreamExchangeMode.BATCH;
Expand All @@ -61,29 +51,6 @@ public static StreamExchangeMode getBatchStreamExchangeMode(
return StreamExchangeMode.UNDEFINED;
}

/**
* The {@link GlobalStreamExchangeMode} should be determined by the {@link StreamGraphGenerator}
* in the future.
*/
@Deprecated
static Optional<GlobalStreamExchangeMode> getGlobalStreamExchangeMode(ReadableConfig config) {
return config.getOptional(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)
.map(
value -> {
try {
return GlobalStreamExchangeMode.valueOf(
convertLegacyShuffleMode(value).toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"Unsupported value %s for config %s.",
value,
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE
.key()));
}
});
}

private static String convertLegacyShuffleMode(final String shuffleMode) {
switch (shuffleMode.toLowerCase()) {
case ALL_EDGES_BLOCKING_LEGACY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.stream.IntStream;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS;
Expand Down Expand Up @@ -147,13 +146,6 @@ public void testUidGeneration() {
checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY), true, false);
checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, ALWAYS), true, true);
checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, DISABLED), false, false);
checkUids(
c -> {
c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY);
c.set(TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
},
false,
false);
}

private static void checkUids(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.api.config.ExecutionConfigOptions;

import org.junit.jupiter.api.Test;

import static org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getBatchStreamExchangeMode;
import static org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getGlobalStreamExchangeMode;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link StreamExchangeModeUtils}. */
class StreamExchangeModeUtilsTest {
Expand Down Expand Up @@ -68,74 +64,4 @@ void testBatchStreamExchangeMode() {
assertThat(getBatchStreamExchangeMode(configuration, StreamExchangeMode.BATCH))
.isEqualTo(StreamExchangeMode.BATCH);
}

@Test
void testBatchStreamExchangeModeLegacyPrecedence() {
final Configuration configuration = new Configuration();

configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED);
configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.ALL_EDGES_BLOCKING.toString());

assertThat(getBatchStreamExchangeMode(configuration, null))
.isEqualTo(StreamExchangeMode.BATCH);
}

@Test
void testLegacyShuffleMode() {
final Configuration configuration = new Configuration();

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.ALL_EDGES_BLOCKING.toString());
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED.toString());
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalStreamExchangeMode.ALL_EDGES_PIPELINED.toString());
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
StreamExchangeModeUtils.ALL_EDGES_BLOCKING_LEGACY);
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
StreamExchangeModeUtils.ALL_EDGES_PIPELINED_LEGACY);
assertThat(getGlobalStreamExchangeMode(configuration).orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);

configuration.set(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "Forward_edges_PIPELINED");
assertThat(
StreamExchangeModeUtils.getGlobalStreamExchangeMode(configuration)
.orElseThrow(AssertionError::new))
.isEqualTo(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
}

@Test
void testInvalidLegacyShuffleMode() {
final Configuration configuration = new Configuration();
configuration.set(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "invalid-value");
assertThatThrownBy(() -> StreamExchangeModeUtils.getGlobalStreamExchangeMode(configuration))
.isInstanceOf(IllegalArgumentException.class);
}
}

0 comments on commit 499df42

Please sign in to comment.