Skip to content

Commit

Permalink
[FLINK-22832][sql-client] Drop usages of legacy planner in SQL Client
Browse files Browse the repository at this point in the history
This closes apache#16052.
  • Loading branch information
twalthr committed Jun 2, 2021
1 parent a81c720 commit d51f057
Show file tree
Hide file tree
Showing 20 changed files with 87 additions and 244 deletions.
5 changes: 1 addition & 4 deletions docs/content.zh/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;

-- Properties that change the fundamental execution behavior of a table program.

SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
Expand All @@ -362,7 +361,7 @@ This configuration:
- defines a table `MyTableSource` that can read data from a CSV file,
- defines a view `MyCustomView` that declares a virtual table using a SQL query,
- defines a user-defined function `myUDF` that can be instantiated using the class name,
- uses the blink planner in streaming mode for running statements and a parallelism of 1,
- uses streaming mode for running statements and a parallelism of 1,
- runs exploratory queries in the `table` result mode,
- and makes some planner adjustments around join reordering and spilling via configuration options.

Expand Down Expand Up @@ -688,8 +687,6 @@ To distinguish the deprecated key, the sql client use the '[DEPRECATED]' as the
Flink SQL>SET;
execution.runtime-mode=batch
sql-client.execution.result-mode=table
table.planner=blink
[DEPRECATED] execution.planner=blink
[DEPRECATED] execution.result-mode=table
[DEPRECATED] execution.type=batch
```
Expand Down
5 changes: 1 addition & 4 deletions docs/content/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;

-- Properties that change the fundamental execution behavior of a table program.

SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
Expand All @@ -368,7 +367,7 @@ This configuration:
- defines a table `MyTableSource` that can read data from a CSV file,
- defines a view `MyCustomView` that declares a virtual table using a SQL query,
- defines a user-defined function `myUDF` that can be instantiated using the class name,
- uses the blink planner in streaming mode for running statements and a parallelism of 1,
- uses streaming mode for running statements and a parallelism of 1,
- runs exploratory queries in the `table` result mode,
- and makes some planner adjustments around join reordering and spilling via configuration options.

Expand Down Expand Up @@ -694,8 +693,6 @@ To distinguish the deprecated key, the sql client use the '[DEPRECATED]' as the
Flink SQL>SET;
execution.runtime-mode=batch
sql-client.execution.result-mode=table
table.planner=blink
[DEPRECATED] execution.planner=blink
[DEPRECATED] execution.result-mode=table
[DEPRECATED] execution.type=batch
```
Expand Down
3 changes: 1 addition & 2 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ fi
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh hashmap" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"

run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"

run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
run_test "TPC-DS end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpcds.sh"
Expand Down
5 changes: 0 additions & 5 deletions flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

set -Eeuo pipefail

PLANNER="${1:-old}"

KAFKA_VERSION="2.2.2"
CONFLUENT_VERSION="5.0.0"
CONFLUENT_MAJOR_VERSION="5.0"
Expand Down Expand Up @@ -213,9 +211,6 @@ functions:
- name: RegReplace
from: class
class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
execution:
planner: "$PLANNER"
EOF

# submit SQL statements
Expand Down
6 changes: 0 additions & 6 deletions flink-table/flink-sql-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,8 +55,6 @@ public class ExecutionEntry extends ConfigEntry {

public static final String EXECUTION_PLANNER = "planner";

public static final String EXECUTION_PLANNER_VALUE_OLD = "old";

public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";

public static final String EXECUTION_TYPE = "type";
Expand Down Expand Up @@ -117,9 +116,7 @@ private ExecutionEntry(DescriptorProperties properties) {
@Override
protected void validate(DescriptorProperties properties) {
properties.validateEnumValues(
EXECUTION_PLANNER,
true,
Arrays.asList(EXECUTION_PLANNER_VALUE_OLD, EXECUTION_PLANNER_VALUE_BLINK));
EXECUTION_PLANNER, true, Collections.singletonList(EXECUTION_PLANNER_VALUE_BLINK));
properties.validateEnumValues(
EXECUTION_TYPE,
true,
Expand Down Expand Up @@ -166,35 +163,6 @@ public boolean inBatchMode() {
.orElse(false);
}

public boolean isStreamingPlanner() {
final String planner =
properties
.getOptionalString(EXECUTION_PLANNER)
.orElse(EXECUTION_PLANNER_VALUE_BLINK);

// Blink planner is a streaming planner
if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
return true;
}
// Old planner can be a streaming or batch planner
else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
return inStreamingMode();
}

return false;
}

public boolean isBlinkPlanner() {
final String planner =
properties
.getOptionalString(EXECUTION_PLANNER)
.orElse(EXECUTION_PLANNER_VALUE_BLINK);
if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
return false;
}
return true;
}

public Optional<Integer> getParallelism() {
return properties.getOptionalInt(EXECUTION_PARALLELISM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

package org.apache.flink.table.client.gateway.context;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class ExecutionContext {
private final SessionState sessionState;
private final URLClassLoader classLoader;

private final TableEnvironment tableEnv;
private final StreamTableEnvironment tableEnv;

public ExecutionContext(
Configuration flinkConfig, URLClassLoader classLoader, SessionState sessionState) {
Expand All @@ -78,7 +77,7 @@ public ExecutionContext(ExecutionContext context) {
this.flinkConfig = context.flinkConfig;
this.sessionState = context.sessionState;
this.classLoader = context.classLoader;
// create a new table env

this.tableEnv = createTableEnvironment();
}

Expand All @@ -91,41 +90,42 @@ public <R> R wrapClassLoader(Supplier<R> supplier) {
}
}

public TableEnvironment getTableEnvironment() {
public StreamTableEnvironment getTableEnvironment() {
return tableEnv;
}

// ------------------------------------------------------------------------------------------------------------------
// Helper to create Table Environment
// ------------------------------------------------------------------------------------------------------------------

private TableEnvironment createTableEnvironment() {
// check the value of TABLE_PLANNER and RUNTIME_MODE
private StreamTableEnvironment createTableEnvironment() {
// checks the value of RUNTIME_MODE
EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig);

if (!settings.isBlinkPlanner()) {
throw new TableException(
"The old planner is not supported anymore. Please update to new default planner.");
}

TableConfig config = new TableConfig();
config.addConfiguration(flinkConfig);
if (!settings.isStreamingMode() && !settings.isBlinkPlanner()) {
ExecutionEnvironment execEnv = createExecutionEnvironment();
return new BatchTableEnvironmentImpl(
execEnv, config, sessionState.catalogManager, sessionState.moduleManager);
} else {
StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
config,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog,
classLoader);
}

StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
config,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog,
classLoader);
}

private TableEnvironment createStreamTableEnvironment(
private StreamTableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
TableConfig config,
Expand Down Expand Up @@ -184,10 +184,4 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
// This requires StreamExecutionEnvironment to have a full flink configuration.
return new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader);
}

private ExecutionEnvironment createExecutionEnvironment() {
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfiguration().addAll(flinkConfig);
return execEnv;
}
}
Loading

0 comments on commit d51f057

Please sign in to comment.