Skip to content

Commit

Permalink
Function: add possibility to pass additional JVM arguments to the fun…
Browse files Browse the repository at this point in the history
…ction JVM (additionalJavaRuntimeArguments) (apache#13282)

### Motivation

Sometimes it would be useful to be able to tune the JVM of the functions by passing additional command line arguments, 
like "-XX:+ExitOnOutOfMemoryError" or "-Dlog4j2.formatMsgNoLookups".

Those settings are not per-function, but they are to be applied to every function process.

### Modifications

Add a new configuration parameter additionalJavaRuntimeArguments in functions_worker.conf.
This is a list of strings to be added to the command line of the Java process.
  • Loading branch information
eolivelli authored Dec 15, 2021
1 parent d38c88a commit 58e1dff
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 0 deletions.
4 changes: 4 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ functionRuntimeFactoryConfigs:
# change the extra dependencies location:
extraFunctionDependenciesDir:

#### Additional JVM tuning (only process and Kubernetes runtime) ####
# This arguments will be added to the command line execution of 'java'
#additionalJavaRuntimeArguments: ['-XX:+ExitOnOutOfMemoryError']

#### Thread Runtime ####
# Pulsar function instances are run as threads

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;

import java.util.Collections;
import java.util.List;

/**
* This is the config passed to the Java Instance. Contains all the information
* passed to run functions.
Expand All @@ -44,6 +47,7 @@ public class InstanceConfig {
@Getter
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;
private List<String> additionalJavaRuntimeArguments = Collections.emptyList();

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ public static List<String> getCmd(InstanceConfig instanceConfig,

args.add("-Dio.netty.tryReflectionSetAccessible=true");

if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
}

if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
args.add(runtimeFlagArg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -539,6 +541,11 @@ public String getFunctionAuthProviderClassName() {
)
private boolean forwardSourceMessageProperty = true;

@FieldContext(
doc = "Additional arguments to pass to the Java command line for Java functions"
)
private List<String> additionalJavaRuntimeArguments = new ArrayList<>();

public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.jose4j.json.internal.json_simple.JSONObject;
Expand All @@ -28,9 +30,13 @@
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

import static org.testng.AssertJUnit.assertTrue;

@Slf4j
public class RuntimeUtilsTest {

@Test
Expand Down Expand Up @@ -167,4 +173,50 @@ public static Object[][] k8sRuntimeFlag() {
}
};
}

@Test(dataProvider = "k8sRuntime")
public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Exception {

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setClusterName("kluster");
instanceConfig.setInstanceId(3000);
instanceConfig.setFunctionId("func-7734");
instanceConfig.setFunctionVersion("1.0.0");
instanceConfig.setMaxBufferedTuples(5);
instanceConfig.setPort(1337);
instanceConfig.setFunctionDetails(Function.FunctionDetails.newBuilder().build());
instanceConfig.setAdditionalJavaRuntimeArguments(Arrays.asList("-XX:+ExitOnOutOfMemoryError"));

List<String> cmd = RuntimeUtils.getCmd(instanceConfig, "instanceFile",
"extraDependenciesDir", /* extra dependencies for running instances */
"logDirectory",
"originalCodeFileName",
"pulsarServiceUrl",
"stateStorageServiceUrl",
AuthenticationConfig.builder().build(),
"shardId",
23,
1234L,
"logConfigFile",
"secretsProviderClassName",
"secretsProviderConfig",
false,
null,
null,
"narExtractionDirectory",
"functionInstanceClassPath",
false,
"");

log.info("cmd {}", cmd);

assertTrue(cmd.contains("-XX:+ExitOnOutOfMemoryError"));

// verify that the additional runtime arguments are passed before the Java class
int indexJavaClass = cmd.indexOf("org.apache.pulsar.functions.instance.JavaInstanceMain");
int indexAdditionalArguments = cmd.indexOf("-XX:+ExitOnOutOfMemoryError");
assertTrue(indexJavaClass > 0);
assertTrue(indexAdditionalArguments > 0);
assertTrue(indexAdditionalArguments < indexJavaClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests());
instanceConfig.setExposePulsarAdminClientEnabled(workerConfig.isExposeAdminClientEnabled());
if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
}
return instanceConfig;
}

Expand Down

0 comments on commit 58e1dff

Please sign in to comment.