From 58e1dff4e83ec95d9f0805b1072f34bf5250e989 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 15 Dec 2021 01:03:36 +0100 Subject: [PATCH] Function: add possibility to pass additional JVM arguments to the function JVM (additionalJavaRuntimeArguments) (#13282) ### Motivation Sometimes it would be useful to be able to tune the JVM of the functions by passing additional command line arguments, like "-XX:+ExitOnOutOfMemoryError" or "-Dlog4j2.formatMsgNoLookups". Those settings are not per-function, but they are to be applied to every function process. ### Modifications Add a new configuration parameter additionalJavaRuntimeArguments in functions_worker.conf. This is a list of strings to be added to the command line of the Java process. --- conf/functions_worker.yml | 4 ++ .../functions/instance/InstanceConfig.java | 4 ++ .../functions/runtime/RuntimeUtils.java | 4 ++ .../pulsar/functions/worker/WorkerConfig.java | 7 +++ .../functions/runtime/RuntimeUtilsTest.java | 52 +++++++++++++++++++ .../functions/worker/FunctionActioner.java | 3 ++ 6 files changed, 74 insertions(+) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 9ca5f7bc923e5..c3eb826d1dba7 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -126,6 +126,10 @@ functionRuntimeFactoryConfigs: # change the extra dependencies location: extraFunctionDependenciesDir: +#### Additional JVM tuning (only process and Kubernetes runtime) #### +# This arguments will be added to the command line execution of 'java' +#additionalJavaRuntimeArguments: ['-XX:+ExitOnOutOfMemoryError'] + #### Thread Runtime #### # Pulsar function instances are run as threads diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java index ddf437c192463..98f602abfb9e1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java @@ -23,6 +23,9 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import java.util.Collections; +import java.util.List; + /** * This is the config passed to the Java Instance. Contains all the information * passed to run functions. @@ -44,6 +47,7 @@ public class InstanceConfig { @Getter private boolean exposePulsarAdminClientEnabled = false; private int metricsPort; + private List additionalJavaRuntimeArguments = Collections.emptyList(); /** * Get the string representation of {@link #getInstanceId()}. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 9e4dea8b110f0..f6d0a12165e31 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -320,6 +320,10 @@ public static List getCmd(InstanceConfig instanceConfig, args.add("-Dio.netty.tryReflectionSetAccessible=true"); + if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) { + args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments()); + } + if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) { for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) { args.add(runtimeFlagArg); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index eb40ae3c27210..46f9108760e1d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -33,6 +33,8 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -539,6 +541,11 @@ public String getFunctionAuthProviderClassName() { ) private boolean forwardSourceMessageProperty = true; + @FieldContext( + doc = "Additional arguments to pass to the Java command line for Java functions" + ) + private List additionalJavaRuntimeArguments = new ArrayList<>(); + public String getFunctionMetadataTopic() { return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java index bc00776c78ebd..1651a7ba1861e 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.jose4j.json.internal.json_simple.JSONObject; @@ -28,9 +30,13 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +import static org.testng.AssertJUnit.assertTrue; + +@Slf4j public class RuntimeUtilsTest { @Test @@ -167,4 +173,50 @@ public static Object[][] k8sRuntimeFlag() { } }; } + + @Test(dataProvider = "k8sRuntime") + public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Exception { + + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setClusterName("kluster"); + instanceConfig.setInstanceId(3000); + instanceConfig.setFunctionId("func-7734"); + instanceConfig.setFunctionVersion("1.0.0"); + instanceConfig.setMaxBufferedTuples(5); + instanceConfig.setPort(1337); + instanceConfig.setFunctionDetails(Function.FunctionDetails.newBuilder().build()); + instanceConfig.setAdditionalJavaRuntimeArguments(Arrays.asList("-XX:+ExitOnOutOfMemoryError")); + + List cmd = RuntimeUtils.getCmd(instanceConfig, "instanceFile", + "extraDependenciesDir", /* extra dependencies for running instances */ + "logDirectory", + "originalCodeFileName", + "pulsarServiceUrl", + "stateStorageServiceUrl", + AuthenticationConfig.builder().build(), + "shardId", + 23, + 1234L, + "logConfigFile", + "secretsProviderClassName", + "secretsProviderConfig", + false, + null, + null, + "narExtractionDirectory", + "functionInstanceClassPath", + false, + ""); + + log.info("cmd {}", cmd); + + assertTrue(cmd.contains("-XX:+ExitOnOutOfMemoryError")); + + // verify that the additional runtime arguments are passed before the Java class + int indexJavaClass = cmd.indexOf("org.apache.pulsar.functions.instance.JavaInstanceMain"); + int indexAdditionalArguments = cmd.indexOf("-XX:+ExitOnOutOfMemoryError"); + assertTrue(indexJavaClass > 0); + assertTrue(indexAdditionalArguments > 0); + assertTrue(indexAdditionalArguments < indexJavaClass); + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 5fa554097257a..52fcf8e52a3d1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -189,6 +189,9 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec); instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests()); instanceConfig.setExposePulsarAdminClientEnabled(workerConfig.isExposeAdminClientEnabled()); + if (workerConfig.getAdditionalJavaRuntimeArguments() != null) { + instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments()); + } return instanceConfig; }