From b9249d7d421c9916104df5d8623875691076372d Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Tue, 19 Jul 2022 10:35:02 +0200 Subject: [PATCH] [fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for the function Runtimes (#16423) --- .../functions/runtime/RuntimeUtils.java | 9 ++++++- .../kubernetes/KubernetesRuntimeTest.java | 25 +++++++++++++------ .../runtime/process/ProcessRuntimeTest.java | 23 +++++++++++------ 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 1fb7650b1e8fb..a1101e80c95f2 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -41,7 +41,9 @@ import java.util.Map; import javax.management.MalformedObjectNameException; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.JavaVersion; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -255,7 +257,6 @@ public static List getGoInstanceCmd(InstanceConfig instanceConfig, return args; } - public static List getCmd(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, /* extra dependencies for running instances */ @@ -320,6 +321,12 @@ public static List getCmd(InstanceConfig instanceConfig, args.add("-Dio.netty.tryReflectionSetAccessible=true"); + // Needed for netty.DnsResolverUtil on JDK9+ + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + args.add("--add-opens"); + args.add("java.base/sun.net=ALL-UNNAMED"); + } + if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) { args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments()); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 88cccfb7b8315..f9a5521b2df57 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -33,6 +33,8 @@ import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1Toleration; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.SystemUtils; +import org.apache.commons.lang3.JavaVersion; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -48,6 +50,7 @@ import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -380,8 +383,13 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s } private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception { - KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); - List args = container.getProcessArgs(); + KubernetesRuntime container; + List args; + try (MockedStatic systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) { + systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true); + container = factory.createContainer(config, userJarFile, userJarFile, 30L); + args = container.getProcessArgs(); + } String classpath = javaInstanceJarFile; String extraDepsEnv; @@ -392,14 +400,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s if (null != depsDir) { extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - totalArgs = 40; - portArg = 26; - metricsPortArg = 28; + totalArgs = 42; + portArg = 28; + metricsPortArg = 30; } else { extraDepsEnv = ""; - portArg = 25; - metricsPortArg = 27; - totalArgs = 39; + portArg = 27; + metricsPortArg = 29; + totalArgs = 41; } if (secretsAttached) { totalArgs += 4; @@ -430,6 +438,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID" + " -Dio.netty.tryReflectionSetAccessible=true -Xmx" + String.valueOf(RESOURCES.getRam()) + + " --add-opens java.base/sun.net=ALL-UNNAMED" + " org.apache.pulsar.functions.instance.JavaInstanceMain" + " --jar " + jarLocation + " --instance_id " + "$SHARD_ID" + " --function_id " + config.getFunctionId() diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 301e38ee82086..6d80ec871b68c 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -38,6 +38,8 @@ import java.util.Optional; import io.kubernetes.client.openapi.models.V1PodSpec; +import org.apache.commons.lang3.JavaVersion; +import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; @@ -50,6 +52,7 @@ import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -283,14 +286,19 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce } private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webServiceUrl) throws Exception { - ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); - List args = container.getProcessArgs(); + List args; + try (MockedStatic systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) { + systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true); + ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30L); + args = container.getProcessArgs(); + } + String classpath = javaInstanceJarFile; String extraDepsEnv; int portArg; int metricsPortArg; - int totalArgCount = 42; + int totalArgCount = 44; if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { totalArgCount += 3; } @@ -298,13 +306,13 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS assertEquals(args.size(), totalArgCount); extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - portArg = 25; - metricsPortArg = 27; + portArg = 27; + metricsPortArg = 29; } else { assertEquals(args.size(), totalArgCount-1); extraDepsEnv = ""; - portArg = 24; - metricsPortArg = 26; + portArg = 26; + metricsPortArg = 28; } if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) { portArg += 3; @@ -321,6 +329,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() + " -Dio.netty.tryReflectionSetAccessible=true" + + " --add-opens java.base/sun.net=ALL-UNNAMED" + " org.apache.pulsar.functions.instance.JavaInstanceMain" + " --jar " + userJarFile + " --instance_id " + config.getInstanceId() + " --function_id " + config.getFunctionId()