From 7285380564d0137c7ebfa83ded72362b858ef9c2 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Thu, 29 Oct 2020 10:09:20 -0400 Subject: [PATCH] [Issue 7742][functions] Allow kubernetes runtime to customize function instance class path (#7844) Fixes #1338 ### Motivation Currently, the function worker is using the function worker's classpath to configure the function instance (runner)'s classpath. So when the broker (function worker) is using an image that is different from the function instance (runner) for kubernetes runtime, the classpath will be wrong and the function instance is not able to load the instance classes. ### Modifications Adding an function instance class path entry into the kubernetes runtime config. And construct the function launch command accordingly. ### Verifying this change - [X] Make sure that the change passes the CI checks. This change is already covered by existing tests, such as KubernetesRuntimeTest. ### Does this pull request potentially affect one of the following parts: No ### Documentation - Does this pull request introduce a new feature? No Co-authored-by: Yong Zhang --- conf/functions_worker.yml | 3 +++ .../functions/runtime/RuntimeUtils.java | 27 +++++++++++-------- .../runtime/kubernetes/KubernetesRuntime.java | 8 ++++-- .../kubernetes/KubernetesRuntimeFactory.java | 5 +++- .../KubernetesRuntimeFactoryConfig.java | 5 ++++ .../runtime/process/ProcessRuntime.java | 4 ++- 6 files changed, 37 insertions(+), 15 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index cf01344e3d328..51d6f571f0059 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -175,6 +175,9 @@ functionRuntimeFactoryConfigs: # # The ratio memory request and memory limit to be set for a function/source/sink. # # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio # memoryOverCommitRatio: 1.0 +# # The function instance class path to be set if it's different from the +# # broker/function-worker provided class path +# functionInstanceClassPath: ## A set of the minimum amount of resources functions must request. ## Support for this depends on function runtime. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index be1fcbefb3d54..4108f78e69549 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -71,7 +71,8 @@ public static List composeCmd(InstanceConfig instanceConfig, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort, - String narExtractionDirectory) throws Exception { + String narExtractionDirectory, + String functionInstanceClassPath) throws Exception { final List cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir); @@ -80,7 +81,7 @@ public static List composeCmd(InstanceConfig instanceConfig, authConfig, shardId, grpcPort, expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, - pythonExtraDependencyRepository, metricsPort, narExtractionDirectory)); + pythonExtraDependencyRepository, metricsPort, narExtractionDirectory, functionInstanceClassPath)); return cmd; } @@ -250,7 +251,8 @@ public static List getCmd(InstanceConfig instanceConfig, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort, - String narExtractionDirectory) throws Exception { + String narExtractionDirectory, + String functionInstanceClassPath) throws Exception { final List args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) { @@ -272,15 +274,18 @@ public static List getCmd(InstanceConfig instanceConfig, args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir)); } - // add complete classpath for broker/worker so that the function instance can load - // the functions instance dependencies separately from user code dependencies - String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH); - if (functionInstanceClasspath == null) { - log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH); - functionInstanceClasspath = System.getProperty("java.class.path"); + if (StringUtils.isNotEmpty(functionInstanceClassPath)) { + args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath)); + } else { + // add complete classpath for broker/worker so that the function instance can load + // the functions instance dependencies separately from user code dependencies + String systemFunctionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH); + if (systemFunctionInstanceClasspath == null) { + log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH); + systemFunctionInstanceClasspath = System.getProperty("java.class.path"); + } + args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath)); } - args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath)); - args.add("-Dlog4j.configurationFile=" + logConfigFile); args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig)); args.add("-Dpulsar.function.log.file=" + String.format( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 3c117a7369166..6ae3d3be93b35 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime { private Integer metricsPort; private String narExtractionDirectory; private final Optional manifestCustomizer; + private String functionInstanceClassPath; KubernetesRuntime(AppsV1Api appsClient, CoreV1Api coreClient, @@ -182,7 +183,8 @@ public class KubernetesRuntime implements Runtime { Integer grpcPort, Integer metricsPort, String narExtractionDirectory, - Optional manifestCustomizer) throws Exception { + Optional manifestCustomizer, + String functinoInstanceClassPath) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; this.instanceConfig = instanceConfig; @@ -202,6 +204,7 @@ public class KubernetesRuntime implements Runtime { this.memoryOverCommitRatio = memoryOverCommitRatio; this.authenticationEnabled = authenticationEnabled; this.manifestCustomizer = manifestCustomizer; + this.functionInstanceClassPath = functinoInstanceClassPath; String logConfigFile = null; String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); String secretsProviderConfig = null; @@ -253,7 +256,8 @@ public class KubernetesRuntime implements Runtime { pythonDependencyRepository, pythonExtraDependencyRepository, metricsPort, - narExtractionDirectory)); + narExtractionDirectory, + functinoInstanceClassPath)); doChecks(instanceConfig.getFunctionDetails()); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 5a0a54a34b640..31d8d3966fdae 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -95,6 +95,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private Integer grpcPort; private Integer metricsPort; private String narExtractionDirectory; + private String functionInstanceClassPath; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -233,6 +234,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic this.grpcPort = factoryConfig.getGrpcPort(); this.metricsPort = factoryConfig.getMetricsPort(); this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory(); + this.functionInstanceClassPath = factoryConfig.getFunctionInstanceClassPath(); } @Override @@ -298,7 +300,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c grpcPort, metricsPort, narExtractionDirectory, - manifestCustomizer); + manifestCustomizer, + functionInstanceClassPath); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java index 9f506a476b9a3..fafdaa3240b44 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java @@ -152,4 +152,9 @@ public class KubernetesRuntimeFactoryConfig { ) private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @FieldContext( + doc = "The classpath where function instance files stored" + ) + private String functionInstanceClassPath = ""; + } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index a4fcf739f2a25..876b91afb3161 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -136,7 +136,9 @@ class ProcessRuntime implements Runtime { false, null, null, - this.metricsPort, narExtractionDirectory); + this.metricsPort, + narExtractionDirectory, + null); } /**