Skip to content

Commit

Permalink
[FLINK-21464][sql-client] Support ADD JAR in SQL Client (apache#15925)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 authored May 27, 2021
1 parent 8e6f152 commit 34ff753
Show file tree
Hide file tree
Showing 22 changed files with 508 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.operations.ShowCreateTableOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ClearOperation;
import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
Expand Down Expand Up @@ -342,7 +343,8 @@ private void validate(Operation operation, ExecutionMode executionMode) {
&& !(operation instanceof UseOperation)
&& !(operation instanceof AlterOperation)
&& !(operation instanceof LoadModuleOperation)
&& !(operation instanceof UnloadModuleOperation)) {
&& !(operation instanceof UnloadModuleOperation)
&& !(operation instanceof AddJarOperation)) {
throw new SqlExecutionException(
"Unsupported operation in sql init file: " + operation.asSummaryString());
}
Expand Down Expand Up @@ -420,6 +422,9 @@ private void callOperation(Operation operation, ExecutionMode mode) {
} else if (operation instanceof EndStatementSetOperation) {
// END
callEndStatementSet();
} else if (operation instanceof AddJarOperation) {
// ADD JAR
callAddJar((AddJarOperation) operation);
} else if (operation instanceof ShowCreateTableOperation) {
// SHOW CREATE TABLE
callShowCreateTable((ShowCreateTableOperation) operation);
Expand All @@ -429,6 +434,12 @@ private void callOperation(Operation operation, ExecutionMode mode) {
}
}

private void callAddJar(AddJarOperation operation) {
String jarPath = operation.getPath();
executor.addJar(sessionId, jarPath);
printInfo(CliStrings.MESSAGE_ADD_JAR_STATEMENT);
}

private void callQuit() {
printInfo(CliStrings.MESSAGE_QUIT);
isRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ private CliStrings() {

public static final String MESSAGE_EXECUTE_STATEMENT = "Execute statement succeed.";

public static final String MESSAGE_ADD_JAR_STATEMENT =
"The specified jar is added into session classloader.";

// --------------------------------------------------------------------------------------------

public static final String RESULT_TITLE = "SQL Query Result";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,7 @@ TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageS
* has been sent to cluster.
*/
void cancelQuery(String sessionId, String resultId) throws SqlExecutionException;

/** Add the JAR resource to into the classloader with specified session. */
void addJar(String sessionId, String jarPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.flink.table.client.gateway.context;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
Expand All @@ -32,15 +37,22 @@
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.util.JarUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Context describing a session, it's mainly used for user to open a new session in the backend. If
Expand All @@ -58,7 +70,10 @@ public class SessionContext {
private final Configuration sessionConfiguration;

private final SessionState sessionState;
private final URLClassLoader classLoader;
// SafetyNetWrapperClassLoader doesn't override the getURL therefore we need to maintain the
// dependencies by ourselves.
private Set<URL> dependencies;
private URLClassLoader classLoader;
private ExecutionContext executionContext;

private SessionContext(
Expand All @@ -74,6 +89,7 @@ private SessionContext(
this.classLoader = classLoader;
this.sessionState = sessionState;
this.executionContext = executionContext;
this.dependencies = new HashSet<>(defaultContext.getDependencies());
}

// --------------------------------------------------------------------------------------------
Expand All @@ -96,6 +112,11 @@ public Map<String, String> getConfigMap() {
return sessionConfiguration.toMap();
}

@VisibleForTesting
Set<URL> getDependencies() {
return dependencies;
}

// --------------------------------------------------------------------------------------------
// Method to execute commands
// --------------------------------------------------------------------------------------------
Expand All @@ -111,7 +132,10 @@ public void reset() {
// If rebuild a new Configuration, it loses control of the SessionState if users wants to
// modify the configuration
resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
this.executionContext = new ExecutionContext(executionContext);
// Reset configuration will revert the `pipeline.jars`. To make the current classloader
// still work, add the maintained dependencies into the configuration.
updateClassLoaderAndDependencies(dependencies);
executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
}

/**
Expand Down Expand Up @@ -244,6 +268,20 @@ public static SessionContext create(DefaultContext defaultContext, String sessio
executionContext);
}

public void addJar(String jarPath) {
URL jarURL = getURLFromPath(jarPath);
if (dependencies.contains(jarURL)) {
return;
}

Set<URL> newDependencies = new HashSet<>(dependencies);
newDependencies.add(jarURL);
updateClassLoaderAndDependencies(newDependencies);

// renew the execution context
executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
}

// --------------------------------------------------------------------------------------------
// Inner class
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -275,4 +313,56 @@ private void resetSessionConfigurationToDefault(Configuration defaultConf) {
}
sessionConfiguration.addAll(defaultConf);
}

private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
// merge the jar in config with the jar maintained in session
Set<URL> jarsInConfig;
try {
jarsInConfig =
new HashSet<>(
ConfigUtils.decodeListFromConfig(
sessionConfiguration, PipelineOptions.JARS, URL::new));
} catch (MalformedURLException e) {
throw new SqlExecutionException(
"Failed to parse the option `pipeline.jars` in configuration.", e);
}
jarsInConfig.addAll(newDependencies);
ConfigUtils.encodeCollectionToConfig(
sessionConfiguration,
PipelineOptions.JARS,
new ArrayList<>(jarsInConfig),
URL::toString);

// TODO: update the the classloader in CatalogManager.
classLoader =
ClientUtils.buildUserCodeClassLoader(
new ArrayList<>(newDependencies),
Collections.emptyList(),
SessionContext.class.getClassLoader(),
sessionConfiguration);
dependencies = new HashSet<>(newDependencies);
}

private URL getURLFromPath(String jarPath) {
Path path = new Path(jarPath);
String scheme = path.toUri().getScheme();
if (scheme != null && !scheme.equals("file")) {
throw new SqlExecutionException("SQL Client only supports to add local jars.");
}

Path qualifiedPath = path.makeQualified(FileSystem.getLocalFileSystem());

try {
URL jarURL = qualifiedPath.toUri().toURL();
JarUtils.checkJarFile(jarURL);
return jarURL;
} catch (MalformedURLException e) {
throw new SqlExecutionException(
String.format("Failed to parse the input jar path: %s", jarPath), e);
} catch (IOException e) {
throw new SqlExecutionException(
String.format("Failed to get the jar file with specified path: %s", jarPath),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,10 @@ public void cancelQuery(String sessionId, String resultId) throws SqlExecutionEx
}
resultStore.removeResult(resultId);
}

@Override
public void addJar(String sessionId, String jarUrl) {
final SessionContext context = getSessionContext(sessionId);
context.addJar(jarUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
@RunWith(Parameterized.class)
public class CliClientITCase extends AbstractTestBase {

// a generated UDF jar used for testing classloading of dependencies
private static URL udfDependency;
private static Path historyPath;
private static Map<String, String> replaceVars;

Expand Down Expand Up @@ -102,11 +100,12 @@ public static void setup() throws IOException {
File udfJar =
TestUserClassLoaderJar.createJarFile(
tempFolder.newFolder("test-jar"), "test-classloader-udf.jar");
udfDependency = udfJar.toURI().toURL();
URL udfDependency = udfJar.toURI().toURL();
historyPath = tempFolder.newFile("history").toPath();

replaceVars = new HashMap<>();
replaceVars.put("$VAR_PIPELINE_JARS", udfDependency.toString());
replaceVars.put("$VAR_UDF_JAR_PATH", udfDependency.getPath());
replaceVars.put("$VAR_PIPELINE_JARS_URL", udfDependency.toString());
replaceVars.put(
"$VAR_REST_PORT",
miniClusterResource.getClientConfiguration().get(PORT).toString());
Expand Down Expand Up @@ -145,7 +144,7 @@ private List<Result> runSqlStatements(List<String> statements) throws IOExceptio
DefaultContext defaultContext =
new DefaultContext(
new Environment(),
Collections.singletonList(udfDependency),
Collections.emptyList(),
new Configuration(miniClusterResource.getClientConfiguration()),
Collections.singletonList(new DefaultCLI()));
final Executor executor = new LocalExecutor(defaultContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,5 +520,10 @@ public List<Row> retrieveResultPage(String resultId, int page)
public void cancelQuery(String sessionId, String resultId) throws SqlExecutionException {
// nothing to do
}

@Override
public void addJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ public List<Row> retrieveResultPage(String resultId, int page)
public void cancelQuery(String sessionId, String resultId) throws SqlExecutionException {
cancellationCounter.countDown();
}

@Override
public void addJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}
}

private static final class TestingCliResultView implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public void cancelQuery(String sessionId, String resultId) throws SqlExecutionEx
numCancelCalls++;
}

@Override
public void addJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId)
throws SqlExecutionException {
Expand Down
Loading

0 comments on commit 34ff753

Please sign in to comment.