Skip to content

Commit

Permalink
[FLINK-32299][gateway] Upload python jar when sql contains python udf…
Browse files Browse the repository at this point in the history
… jar
  • Loading branch information
KarmaGYZ committed Nov 17, 2023
1 parent 873bd13 commit febbbf3
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 38 deletions.
2 changes: 1 addition & 1 deletion docs/themes/book
Submodule book updated 102 files
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ public static DefaultContext buildDefaultContext(CliOptions.EmbeddedCliOptions o
}
Configuration sessionConfig = options.getPythonConfiguration();
sessionConfig.addAll(ConfigurationUtils.createConfiguration(options.getSessionConfig()));
return DefaultContext.load(sessionConfig, discoverDependencies(jars, libDirs), true, true);
return DefaultContext.load(sessionConfig, discoverDependencies(jars, libDirs), true);
}

public static DefaultContext buildDefaultContext(CliOptions.GatewayCliOptions options) {
return DefaultContext.load(
ConfigurationUtils.createConfiguration(options.getSessionConfig()),
Collections.emptyList(),
false,
false);
}
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ static void startSqlGateway(PrintStream stream, String[] args) {
DefaultContext.load(
ConfigurationUtils.createConfiguration(cliOptions.getDynamicConfigs()),
Collections.emptyList(),
true,
true);
SqlGateway gateway =
new SqlGateway(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -137,13 +133,9 @@ private static CustomCommandLine findActiveCommandLine(
* @param dynamicConfig user specified configuration.
* @param dependencies user specified jars
* @param discoverExecutionConfig flag whether to load the execution configuration
* @param discoverPythonJar flag whetehr to load the python jar
*/
public static DefaultContext load(
Configuration dynamicConfig,
List<URL> dependencies,
boolean discoverExecutionConfig,
boolean discoverPythonJar) {
Configuration dynamicConfig, List<URL> dependencies, boolean discoverExecutionConfig) {
// 1. find the configuration directory
String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();

Expand All @@ -159,11 +151,6 @@ public static DefaultContext load(
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

if (discoverPythonJar) {
dependencies = new ArrayList<>(dependencies);
dependencies.addAll(discoverPythonDependencies());
}

if (discoverExecutionConfig) {
Options commandLineOptions = collectCommandLineOptions(commandLines);

Expand All @@ -184,23 +171,4 @@ public static DefaultContext load(

return new DefaultContext(configuration, dependencies);
}

private static List<URL> discoverPythonDependencies() {
try {
URL location =
Class.forName(
"org.apache.flink.python.PythonFunctionRunner",
false,
Thread.currentThread().getContextClassLoader())
.getProtectionDomain()
.getCodeSource()
.getLocation();
if (Paths.get(location.toURI()).toFile().isFile()) {
return Collections.singletonList(location);
}
} catch (URISyntaxException | ClassNotFoundException e) {
LOG.warn("Failed to find flink-python jar." + e);
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void beforeAll(ExtensionContext context) throws Exception {
sessionManager =
sessionManagerCreator.apply(
DefaultContext.load(
new Configuration(), Collections.emptyList(), true, false));
new Configuration(), Collections.emptyList(), true));
} finally {
CommonTestUtils.setEnv(originalEnv);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,9 @@ private FunctionDefinition getFunctionDefinition(String name, CatalogFunction fu
// directly.
return ((InlineCatalogFunction) function).getDefinition();
}
if (function.getFunctionLanguage() == FunctionLanguage.PYTHON) {
resourceManager.registerPythonResources();
}
// If the jar resource of UDF used is not empty, register it to classloader before
// validate.
registerFunctionJarResources(name, function.getFunctionResources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class ResourceManager implements Closeable {
protected final Map<ResourceUri, URL> resourceInfos;
protected final MutableURLClassLoader userClassLoader;

private boolean containPython = false;

public static ResourceManager createResourceManager(
URL[] urls, ClassLoader parent, ReadableConfig config) {
MutableURLClassLoader mutableURLClassLoader =
Expand Down Expand Up @@ -197,6 +201,13 @@ public void unregisterFunctionResources(List<ResourceUri> resourceUris) {
}
}

public void registerPythonResources() {
if (!containPython) {
registerResources(discoverPythonDependencies(), true);
containPython = true;
}
}

public URLClassLoader getUserClassLoader() {
return userClassLoader;
}
Expand Down Expand Up @@ -425,6 +436,26 @@ private Path getResourceLocalPath(Path remotePath) {
return new Path(localResourceDir, fileNameWithUUID);
}

private static Map<ResourceUri, URL> discoverPythonDependencies() {
try {
URL location =
Class.forName(
"org.apache.flink.python.PythonFunctionRunner",
false,
Thread.currentThread().getContextClassLoader())
.getProtectionDomain()
.getCodeSource()
.getLocation();
if (Paths.get(location.toURI()).toFile().isFile()) {
return Collections.singletonMap(
new ResourceUri(ResourceType.JAR, location.getPath()), location);
}
} catch (URISyntaxException | ClassNotFoundException e) {
LOG.warn("Failed to find flink-python jar." + e);
}
return Collections.emptyMap();
}

private void checkResources(Collection<ResourceUri> resourceUris, ResourceType expectedType)
throws IOException {
// check the resource type
Expand Down

0 comments on commit febbbf3

Please sign in to comment.