Skip to content

Commit

Permalink
[FLINK-21485][sql-client] Simplify the ExecutionContext (apache#15006)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 authored Mar 12, 2021
1 parent 63a6aba commit c998ba4
Show file tree
Hide file tree
Showing 23 changed files with 1,782 additions and 1,660 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,21 @@

package org.apache.flink.table.client;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.context.DefaultContext;
import org.apache.flink.table.client.gateway.local.LocalContextUtils;
import org.apache.flink.table.client.gateway.local.LocalExecutor;

import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;

/**
* SQL Client for submitting SQL statements. The client can be executed in two modes: a gateway and
Expand All @@ -69,8 +59,6 @@ public class SqlClient {
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";

public static final String DEFAULT_SESSION_ID = "default";

public SqlClient(boolean isEmbedded, CliOptions options) {
this.isEmbedded = isEmbedded;
this.options = options;
Expand All @@ -79,33 +67,13 @@ public SqlClient(boolean isEmbedded, CliOptions options) {
private void start() {
if (isEmbedded) {
// create local executor with default environment
final List<URL> jars;
if (options.getJars() != null) {
jars = options.getJars();
} else {
jars = Collections.emptyList();
}
final List<URL> libDirs;
if (options.getLibraryDirs() != null) {
libDirs = options.getLibraryDirs();
} else {
libDirs = Collections.emptyList();
}
final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
executor.start();

// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
} else {
context = new SessionContext(options.getSessionId(), sessionEnv);
}
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
final Executor executor = new LocalExecutor(defaultContext);
executor.start();

// Open an new session
String sessionId = executor.openSession(context);
String sessionId = executor.openSession(options.getSessionId());
try {
// add shutdown hook
Runtime.getRuntime()
Expand Down Expand Up @@ -156,32 +124,6 @@ private void openCli(String sessionId, Executor executor) {

// --------------------------------------------------------------------------------------------

private static Environment readSessionEnvironment(URL envUrl) {
// use an empty environment by default
if (envUrl == null) {
System.out.println("No session environment specified.");
return new Environment();
}

System.out.println("Reading session environment from: " + envUrl);
LOG.info("Using session environment file: {}", envUrl);
try {
return Environment.parse(envUrl);
} catch (IOException e) {
throw new SqlClientException(
"Could not read session environment file at: " + envUrl, e);
}
}

private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) {
Map<String, Object> pythonConfig = new HashMap<>(pythonConfiguration.toMap());
Map<String, Object> combinedConfig =
new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
env.setConfiguration(combinedConfig);
}

// --------------------------------------------------------------------------------------------

public static void main(String[] args) {
if (args.length < 1) {
CliOptionsParser.printHelpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public boolean inStreamingMode() {
return properties
.getOptionalString(EXECUTION_TYPE)
.map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
.orElse(false);
.orElse(true);
}

public boolean inBatchMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

Expand All @@ -32,13 +34,13 @@ public interface Executor {
void start() throws SqlExecutionException;

/**
* Open a new session by using the given {@link SessionContext}.
* Open a new session by using the given session id.
*
* @param session context to create new session.
* @return session identifier to track the session.
* @param sessionId session identifier.
* @return used session identifier to track the session.
* @throws SqlExecutionException if any error happen
*/
String openSession(SessionContext session) throws SqlExecutionException;
String openSession(@Nullable String sessionId) throws SqlExecutionException;

/**
* Close the resources of session for given session id.
Expand Down

This file was deleted.

Loading

0 comments on commit c998ba4

Please sign in to comment.