Skip to content

Commit

Permalink
[fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for …
Browse files Browse the repository at this point in the history
…the function Runtimes (apache#16423)
  • Loading branch information
cbornet authored Jul 19, 2022
1 parent 3d4fa00 commit b9249d7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import java.util.Map;
import javax.management.MalformedObjectNameException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand Down Expand Up @@ -255,7 +257,6 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
return args;
}


public static List<String> getCmd(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
Expand Down Expand Up @@ -320,6 +321,12 @@ public static List<String> getCmd(InstanceConfig instanceConfig,

args.add("-Dio.netty.tryReflectionSetAccessible=true");

// Needed for netty.DnsResolverUtil on JDK9+
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
args.add("--add-opens");
args.add("java.base/sun.net=ALL-UNNAMED");
}

if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Toleration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.JavaVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand All @@ -48,6 +50,7 @@
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -380,8 +383,13 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
}

private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
KubernetesRuntime container;
List<String> args;
try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
container = factory.createContainer(config, userJarFile, userJarFile, 30L);
args = container.getProcessArgs();
}

String classpath = javaInstanceJarFile;
String extraDepsEnv;
Expand All @@ -392,14 +400,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
totalArgs = 40;
portArg = 26;
metricsPortArg = 28;
totalArgs = 42;
portArg = 28;
metricsPortArg = 30;
} else {
extraDepsEnv = "";
portArg = 25;
metricsPortArg = 27;
totalArgs = 39;
portArg = 27;
metricsPortArg = 29;
totalArgs = 41;
}
if (secretsAttached) {
totalArgs += 4;
Expand Down Expand Up @@ -430,6 +438,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Dio.netty.tryReflectionSetAccessible=true -Xmx" + String.valueOf(RESOURCES.getRam())
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Optional;

import io.kubernetes.client.openapi.models.V1PodSpec;
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -283,28 +286,33 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce
}

private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webServiceUrl) throws Exception {
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
List<String> args;
try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30L);
args = container.getProcessArgs();
}


String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
int metricsPortArg;
int totalArgCount = 42;
int totalArgCount = 44;
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
if (null != depsDir) {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
portArg = 25;
metricsPortArg = 27;
portArg = 27;
metricsPortArg = 29;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
portArg = 24;
metricsPortArg = 26;
portArg = 26;
metricsPortArg = 28;
}
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
Expand All @@ -321,6 +329,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()
Expand Down

0 comments on commit b9249d7

Please sign in to comment.