diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index cf01344e3d328..51d6f571f0059 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -175,6 +175,9 @@ functionRuntimeFactoryConfigs: # # The ratio memory request and memory limit to be set for a function/source/sink. # # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio # memoryOverCommitRatio: 1.0 +# # The function instance class path to be set if it's different from the +# # broker/function-worker provided class path +# functionInstanceClassPath: ## A set of the minimum amount of resources functions must request. ## Support for this depends on function runtime. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index be1fcbefb3d54..4108f78e69549 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -71,7 +71,8 @@ public static List composeCmd(InstanceConfig instanceConfig, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort, - String narExtractionDirectory) throws Exception { + String narExtractionDirectory, + String functionInstanceClassPath) throws Exception { final List cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir); @@ -80,7 +81,7 @@ public static List composeCmd(InstanceConfig instanceConfig, authConfig, shardId, grpcPort, expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, - pythonExtraDependencyRepository, metricsPort, narExtractionDirectory)); + pythonExtraDependencyRepository, metricsPort, narExtractionDirectory, functionInstanceClassPath)); return cmd; } @@ -250,7 +251,8 @@ public static List getCmd(InstanceConfig instanceConfig, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort, - String narExtractionDirectory) throws Exception { + String narExtractionDirectory, + String functionInstanceClassPath) throws Exception { final List args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) { @@ -272,15 +274,18 @@ public static List getCmd(InstanceConfig instanceConfig, args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir)); } - // add complete classpath for broker/worker so that the function instance can load - // the functions instance dependencies separately from user code dependencies - String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH); - if (functionInstanceClasspath == null) { - log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH); - functionInstanceClasspath = System.getProperty("java.class.path"); + if (StringUtils.isNotEmpty(functionInstanceClassPath)) { + args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath)); + } else { + // add complete classpath for broker/worker so that the function instance can load + // the functions instance dependencies separately from user code dependencies + String systemFunctionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH); + if (systemFunctionInstanceClasspath == null) { + log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH); + systemFunctionInstanceClasspath = System.getProperty("java.class.path"); + } + args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath)); } - args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath)); - args.add("-Dlog4j.configurationFile=" + logConfigFile); args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig)); args.add("-Dpulsar.function.log.file=" + String.format( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 3c117a7369166..6ae3d3be93b35 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime { private Integer metricsPort; private String narExtractionDirectory; private final Optional manifestCustomizer; + private String functionInstanceClassPath; KubernetesRuntime(AppsV1Api appsClient, CoreV1Api coreClient, @@ -182,7 +183,8 @@ public class KubernetesRuntime implements Runtime { Integer grpcPort, Integer metricsPort, String narExtractionDirectory, - Optional manifestCustomizer) throws Exception { + Optional manifestCustomizer, + String functinoInstanceClassPath) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; this.instanceConfig = instanceConfig; @@ -202,6 +204,7 @@ public class KubernetesRuntime implements Runtime { this.memoryOverCommitRatio = memoryOverCommitRatio; this.authenticationEnabled = authenticationEnabled; this.manifestCustomizer = manifestCustomizer; + this.functionInstanceClassPath = functinoInstanceClassPath; String logConfigFile = null; String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); String secretsProviderConfig = null; @@ -253,7 +256,8 @@ public class KubernetesRuntime implements Runtime { pythonDependencyRepository, pythonExtraDependencyRepository, metricsPort, - narExtractionDirectory)); + narExtractionDirectory, + functinoInstanceClassPath)); doChecks(instanceConfig.getFunctionDetails()); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 5a0a54a34b640..31d8d3966fdae 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -95,6 +95,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private Integer grpcPort; private Integer metricsPort; private String narExtractionDirectory; + private String functionInstanceClassPath; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -233,6 +234,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic this.grpcPort = factoryConfig.getGrpcPort(); this.metricsPort = factoryConfig.getMetricsPort(); this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory(); + this.functionInstanceClassPath = factoryConfig.getFunctionInstanceClassPath(); } @Override @@ -298,7 +300,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c grpcPort, metricsPort, narExtractionDirectory, - manifestCustomizer); + manifestCustomizer, + functionInstanceClassPath); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java index 9f506a476b9a3..fafdaa3240b44 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java @@ -152,4 +152,9 @@ public class KubernetesRuntimeFactoryConfig { ) private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @FieldContext( + doc = "The classpath where function instance files stored" + ) + private String functionInstanceClassPath = ""; + } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index a4fcf739f2a25..876b91afb3161 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -136,7 +136,9 @@ class ProcessRuntime implements Runtime { false, null, null, - this.metricsPort, narExtractionDirectory); + this.metricsPort, + narExtractionDirectory, + null); } /**