Skip to content

Commit

Permalink
Support graceful shutdown Kubernetes (apache#3628)
Browse files Browse the repository at this point in the history
* allow for graceful shutdown when running functions in kubernetes

* cleaning up

* fix unit tests
  • Loading branch information
jerrypeng authored Feb 20, 2019
1 parent 2fd283e commit d1fefad
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -201,8 +198,7 @@ public void start() throws Exception {
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
try {
server.shutdown();
runtimeSpawner.close();
close();
} catch (Exception ex) {
System.err.println(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,25 +185,33 @@ public class KubernetesRuntime implements Runtime {
logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
break;
}
this.processArgs = RuntimeUtils.composeArgs(
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
this.originalCodeFileName,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
"$" + ENV_SHARD_ID,
GRPC_PORT,
-1l,
logConfigFile,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
pythonDependencyRepository,
pythonExtraDependencyRepository,
METRICS_PORT);

this.processArgs = new LinkedList<>();
this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
// use exec to to launch function so that it gets launched in the foreground with the same PID as shell
// so that when we kill the pod, the signal will get propagated to the function code
this.processArgs.add("exec");
this.processArgs.addAll(
RuntimeUtils.getCmd(
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
this.originalCodeFileName,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
"$" + ENV_SHARD_ID,
GRPC_PORT,
-1l,
logConfigFile,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
pythonDependencyRepository,
pythonExtraDependencyRepository,
METRICS_PORT));

doChecks(instanceConfig.getFunctionDetails());
}

Expand Down Expand Up @@ -467,7 +475,7 @@ private void submitStatefulSet() throws Exception {
public void deleteStatefulSet() throws InterruptedException {
String statefulSetName = createJobName(instanceConfig.getFunctionDetails());
final V1DeleteOptions options = new V1DeleteOptions();
options.setGracePeriodSeconds(0L);
options.setGracePeriodSeconds(5L);
options.setPropagationPolicy("Foreground");

String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Expand Down Expand Up @@ -521,8 +529,9 @@ public void deleteStatefulSet() throws InterruptedException {

RuntimeUtils.Actions.Action waitForStatefulSetDeletion = RuntimeUtils.Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
// set retry period to be about 2x the graceshutdown time
.numRetries(NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2)
.supplier(() -> {
V1StatefulSet response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,12 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Utils;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -108,7 +104,7 @@ class ProcessRuntime implements Runtime {
break;
}
this.extraDependenciesDir = extraDependenciesDir;
this.processArgs = RuntimeUtils.composeArgs(
this.processArgs = RuntimeUtils.composeCmd(
instanceConfig,
instanceFile,
// DONT SET extra dependencies here (for python runtime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,69 @@ public class RuntimeUtils {

private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";

public static List<String> composeArgs(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
public static List<String> composeCmd(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort) throws Exception {

final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);

cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, metricsPort));
return cmd;
}

public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, String extraDependenciesDir) {

final List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
//no-op
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
// add `extraDependenciesDir` to python package searching path
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
}
}

return args;
}

public static List<String> getCmd(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort) throws Exception {
String originalCodeFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort) throws Exception {
final List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
Expand Down Expand Up @@ -105,10 +150,6 @@ public static List<String> composeArgs(InstanceConfig instanceConfig,
args.add("--jar");
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
// add `extraDependenciesDir` to python package searching path
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
}
args.add("python");
args.add(instanceFile);
args.add("--py");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
totalArgs = 34;
portArg = 25;
metricsPortArg = 27;
totalArgs = 35;
portArg = 26;
metricsPortArg = 28;
} else {
extraDepsEnv = "";
portArg = 24;
metricsPortArg = 26;
totalArgs = 33;
portArg = 25;
metricsPortArg = 27;
totalArgs = 34;
}
if (secretsAttached) {
totalArgs += 4;
Expand All @@ -287,7 +287,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
assertEquals(args.size(), totalArgs,
"Actual args : " + StringUtils.join(args, " "));

String expectedArgs = "java -cp " + classpath
String expectedArgs = "exec java -cp " + classpath
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
Expand Down Expand Up @@ -352,16 +352,16 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, bo
int configArg;
int metricsPortArg;
if (null == extraDepsDir) {
totalArgs = 36;
portArg = 29;
configArg = 9;
pythonPath = "";
metricsPortArg = 31;
} else {
totalArgs = 39;
totalArgs = 37;
portArg = 30;
configArg = 10;
pythonPath = "";
metricsPortArg = 32;
} else {
totalArgs = 40;
portArg = 31;
configArg = 11;
metricsPortArg = 33;
pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
}
if (secretsAttached) {
Expand All @@ -370,7 +370,7 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, bo

assertEquals(args.size(), totalArgs,
"Actual args : " + StringUtils.join(args, " "));
String expectedArgs = pythonPath + "python " + pythonInstanceFile
String expectedArgs = pythonPath + "exec python " + pythonInstanceFile
+ " --py " + pulsarRootDir + "/" + userJarFile
+ " --logging_directory " + logDirectory
+ " --logging_file " + config.getFunctionDetails().getName()
Expand Down

0 comments on commit d1fefad

Please sign in to comment.