diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 43467ae2b921b..f480b2e67c831 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -46,10 +46,7 @@ import java.lang.reflect.Type; import java.net.InetSocketAddress; import java.util.Map; -import java.util.TimerTask; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -201,8 +198,7 @@ public void start() throws Exception { public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. try { - server.shutdown(); - runtimeSpawner.close(); + close(); } catch (Exception ex) { System.err.println(ex); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 2e2c63d1db04d..3fc0c697ad5df 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -185,25 +185,33 @@ public class KubernetesRuntime implements Runtime { logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini"; break; } - this.processArgs = RuntimeUtils.composeArgs( - instanceConfig, - instanceFile, - extraDependenciesDir, - logDirectory, - this.originalCodeFileName, - pulsarServiceUrl, - stateStorageServiceUrl, - authConfig, - "$" + ENV_SHARD_ID, - GRPC_PORT, - -1l, - logConfigFile, - secretsProviderClassName, - secretsProviderConfig, - installUserCodeDependencies, - pythonDependencyRepository, - pythonExtraDependencyRepository, - METRICS_PORT); + + this.processArgs = new LinkedList<>(); + this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir)); + // use exec to to launch function so that it gets launched in the foreground with the same PID as shell + // so that when we kill the pod, the signal will get propagated to the function code + this.processArgs.add("exec"); + this.processArgs.addAll( + RuntimeUtils.getCmd( + instanceConfig, + instanceFile, + extraDependenciesDir, + logDirectory, + this.originalCodeFileName, + pulsarServiceUrl, + stateStorageServiceUrl, + authConfig, + "$" + ENV_SHARD_ID, + GRPC_PORT, + -1l, + logConfigFile, + secretsProviderClassName, + secretsProviderConfig, + installUserCodeDependencies, + pythonDependencyRepository, + pythonExtraDependencyRepository, + METRICS_PORT)); + doChecks(instanceConfig.getFunctionDetails()); } @@ -467,7 +475,7 @@ private void submitStatefulSet() throws Exception { public void deleteStatefulSet() throws InterruptedException { String statefulSetName = createJobName(instanceConfig.getFunctionDetails()); final V1DeleteOptions options = new V1DeleteOptions(); - options.setGracePeriodSeconds(0L); + options.setGracePeriodSeconds(5L); options.setPropagationPolicy("Foreground"); String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()); @@ -521,8 +529,9 @@ public void deleteStatefulSet() throws InterruptedException { RuntimeUtils.Actions.Action waitForStatefulSetDeletion = RuntimeUtils.Actions.Action.builder() .actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn)) - .numRetries(NUM_RETRIES) - .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) + // set retry period to be about 2x the graceshutdown time + .numRetries(NUM_RETRIES * 2) + .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2) .supplier(() -> { V1StatefulSet response; try { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 87017a6c0a848..7cc6efbc4141b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -39,16 +39,12 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.Utils; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; -import java.util.TimerTask; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -108,7 +104,7 @@ class ProcessRuntime implements Runtime { break; } this.extraDependenciesDir = extraDependenciesDir; - this.processArgs = RuntimeUtils.composeArgs( + this.processArgs = RuntimeUtils.composeCmd( instanceConfig, instanceFile, // DONT SET extra dependencies here (for python runtime), diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 77e1f8f812bea..9862b0a228386 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -54,24 +54,69 @@ public class RuntimeUtils { private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir"; - public static List composeArgs(InstanceConfig instanceConfig, - String instanceFile, - String extraDependenciesDir, /* extra dependencies for running instances */ + public static List composeCmd(InstanceConfig instanceConfig, + String instanceFile, + String extraDependenciesDir, /* extra dependencies for running instances */ + String logDirectory, + String originalCodeFileName, + String pulsarServiceUrl, + String stateStorageServiceUrl, + AuthenticationConfig authConfig, + String shardId, + Integer grpcPort, + Long expectedHealthCheckInterval, + String logConfigFile, + String secretsProviderClassName, + String secretsProviderConfig, + Boolean installUserCodeDependencies, + String pythonDependencyRepository, + String pythonExtraDependencyRepository, + int metricsPort) throws Exception { + + final List cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir); + + cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory, + originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, + authConfig, shardId, grpcPort, expectedHealthCheckInterval, + logConfigFile, secretsProviderClassName, secretsProviderConfig, + installUserCodeDependencies, pythonDependencyRepository, + pythonExtraDependencyRepository, metricsPort)); + return cmd; + } + + public static List getArgsBeforeCmd(InstanceConfig instanceConfig, String extraDependenciesDir) { + + final List args = new LinkedList<>(); + if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { + //no-op + } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) { + // add `extraDependenciesDir` to python package searching path + if (StringUtils.isNotEmpty(extraDependenciesDir)) { + args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir); + } + } + + return args; + } + + public static List getCmd(InstanceConfig instanceConfig, + String instanceFile, + String extraDependenciesDir, /* extra dependencies for running instances */ String logDirectory, - String originalCodeFileName, - String pulsarServiceUrl, - String stateStorageServiceUrl, - AuthenticationConfig authConfig, - String shardId, - Integer grpcPort, - Long expectedHealthCheckInterval, - String logConfigFile, - String secretsProviderClassName, - String secretsProviderConfig, - Boolean installUserCodeDependencies, - String pythonDependencyRepository, - String pythonExtraDependencyRepository, - int metricsPort) throws Exception { + String originalCodeFileName, + String pulsarServiceUrl, + String stateStorageServiceUrl, + AuthenticationConfig authConfig, + String shardId, + Integer grpcPort, + Long expectedHealthCheckInterval, + String logConfigFile, + String secretsProviderClassName, + String secretsProviderConfig, + Boolean installUserCodeDependencies, + String pythonDependencyRepository, + String pythonExtraDependencyRepository, + int metricsPort) throws Exception { final List args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("java"); @@ -105,10 +150,6 @@ public static List composeArgs(InstanceConfig instanceConfig, args.add("--jar"); args.add(originalCodeFileName); } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) { - // add `extraDependenciesDir` to python package searching path - if (StringUtils.isNotEmpty(extraDependenciesDir)) { - args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir); - } args.add("python"); args.add(instanceFile); args.add("--py"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index 7b5a879942a40..c7a98324026d7 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -271,14 +271,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s if (null != depsDir) { extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - totalArgs = 34; - portArg = 25; - metricsPortArg = 27; + totalArgs = 35; + portArg = 26; + metricsPortArg = 28; } else { extraDepsEnv = ""; - portArg = 24; - metricsPortArg = 26; - totalArgs = 33; + portArg = 25; + metricsPortArg = 27; + totalArgs = 34; } if (secretsAttached) { totalArgs += 4; @@ -287,7 +287,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s assertEquals(args.size(), totalArgs, "Actual args : " + StringUtils.join(args, " ")); - String expectedArgs = "java -cp " + classpath + String expectedArgs = "exec java -cp " + classpath + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + extraDepsEnv + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml " @@ -352,16 +352,16 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, bo int configArg; int metricsPortArg; if (null == extraDepsDir) { - totalArgs = 36; - portArg = 29; - configArg = 9; - pythonPath = ""; - metricsPortArg = 31; - } else { - totalArgs = 39; + totalArgs = 37; portArg = 30; configArg = 10; + pythonPath = ""; metricsPortArg = 32; + } else { + totalArgs = 40; + portArg = 31; + configArg = 11; + metricsPortArg = 33; pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " "; } if (secretsAttached) { @@ -370,7 +370,7 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, bo assertEquals(args.size(), totalArgs, "Actual args : " + StringUtils.join(args, " ")); - String expectedArgs = pythonPath + "python " + pythonInstanceFile + String expectedArgs = pythonPath + "exec python " + pythonInstanceFile + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory " + logDirectory + " --logging_file " + config.getFunctionDetails().getName()