Skip to content

Commit

Permalink
Allow configuring extra dependencies directory for functions (apache#…
Browse files Browse the repository at this point in the history
…2923)

* Allow configuring extra dependencies directory for functions

*Motivation*

Functions offer interfaces (e.g. SecretsProvider) for users to provider their own implementations.
In order for runtime to load those customize implementation, we should allow users configuring extra dependencies.

*Changes*

Add extra dependencies directory in function worker config to allow user configuring location to load their customized implementations.
  • Loading branch information
sijie authored Nov 4, 2018
1 parent 7351b24 commit 150f2a0
Show file tree
Hide file tree
Showing 13 changed files with 396 additions and 47 deletions.
3 changes: 3 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto

Expand Down Expand Up @@ -273,6 +275,7 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"

ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"

Expand Down
5 changes: 5 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ processContainerFactory:
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
#kubernetesContainerFactory:
# # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
# k8Uri:
Expand All @@ -101,6 +103,9 @@ processContainerFactory:
# pulsarAdminUrl:
# # the custom labels that function worker uses to select the nodes for pods
# customLabels:
# # the directory for dropping extra function dependencies
# # if it is not an absolute path, it is relative to `pulsarRootDir`
# extraFunctionDependenciesDir:

############################################
# security settings for worker service
Expand Down
8 changes: 8 additions & 0 deletions distribution/server/src/assemble/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
<fileSet>
<directory>${basedir}/licenses</directory>
</fileSet>
<!-- created `instances/deps` directory -->
<fileSet>
<directory>.</directory>
<outputDirectory>instances/deps</outputDirectory>
<excludes>
<exclude>*/**</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>${basedir}/../../pulsar-functions/instance/target/python-instance</directory>
<outputDirectory>instances/python-instance</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import com.squareup.okhttp.Response;
Expand All @@ -32,22 +31,41 @@
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1ContainerPort;
import io.kubernetes.client.models.V1DeleteOptions;
import io.kubernetes.client.models.V1EnvVar;
import io.kubernetes.client.models.V1EnvVarSource;
import io.kubernetes.client.models.V1LabelSelector;
import io.kubernetes.client.models.V1ObjectFieldSelector;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1PodTemplateSpec;
import io.kubernetes.client.models.V1ResourceRequirements;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1ServicePort;
import io.kubernetes.client.models.V1ServiceSpec;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;

import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -119,6 +137,7 @@ class KubernetesRuntime implements Runtime {
String pulsarRootDir,
InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir,
String prometheusMetricsServerJarFile,
String logDirectory,
String userCodePkgUrl,
Expand Down Expand Up @@ -154,9 +173,24 @@ class KubernetesRuntime implements Runtime {
logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
break;
}
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
this.processArgs = RuntimeUtils.composeArgs(
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
this.originalCodeFileName,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
"$" + ENV_SHARD_ID,
GRPC_PORT,
-1l,
logConfigFile,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
pythonDependencyRepository,
pythonExtraDependencyRepository);
this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
running = false;
doChecks(instanceConfig.getFunctionDetails());
Expand Down Expand Up @@ -444,7 +478,6 @@ private static String setShardIdEnvironmentVariableCommand() {
return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
}


private V1StatefulSet createStatefulSet() {
final String jobName = createJobName(instanceConfig.getFunctionDetails());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.nio.file.Paths;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
Expand Down Expand Up @@ -64,6 +65,7 @@ class KubernetesInfo {
private String pulsarServiceUrl;
private String pythonDependencyRepository;
private String pythonExtraDependencyRepository;
private String extraDependenciesDir;
private String changeConfigMap;
private String changeConfigMapNamespace;
}
Expand All @@ -76,6 +78,7 @@ class KubernetesInfo {
private final AuthenticationConfig authConfig;
private final String javaInstanceJarFile;
private final String pythonInstanceFile;
private final String extraDependenciesDir;
private final String prometheusMetricsServerJarFile;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String logDirectory = "logs/functions";
Expand All @@ -92,6 +95,7 @@ public KubernetesRuntimeFactory(String k8Uri,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
String extraDependenciesDir,
Map<String, String> customLabels,
String pulsarServiceUri,
String pulsarAdminUri,
Expand All @@ -118,6 +122,17 @@ public KubernetesRuntimeFactory(String k8Uri,
} else {
this.kubernetesInfo.setPulsarRootDir("/pulsar");
}
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
if (Paths.get(extraDependenciesDir).isAbsolute()) {
this.extraDependenciesDir = extraDependenciesDir;
} else {
this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir()
+ "/" + extraDependenciesDir;
}
} else {
this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir() + "/instances/deps";
}
this.kubernetesInfo.setExtraDependenciesDir(extraDependenciesDir);
this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
Expand Down Expand Up @@ -169,6 +184,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
this.kubernetesInfo.getPulsarRootDir(),
instanceConfig,
instanceFile,
extraDependenciesDir,
prometheusMetricsServerJarFile,
logDirectory,
codePkgUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,15 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
serviceUrl = brokerServiceUrl;
}

try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
null, new DefaultSecretsProviderConfigurator())) {
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
serviceUrl,
stateStorageServiceUrl,
authConfig,
null, /* java instance jar file */
null, /* python instance file */
null, /* log directory */
null, /* extra dependencies dir */
new DefaultSecretsProviderConfigurator())) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ProcessRuntime implements Runtime {

ProcessRuntime(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir,
String logDirectory,
String codeFile,
String pulsarServiceUrl,
Expand All @@ -94,9 +95,24 @@ class ProcessRuntime implements Runtime {
logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
break;
}
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null);
this.processArgs = RuntimeUtils.composeArgs(
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
codeFile,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
instanceConfig.getInstanceName(),
instanceConfig.getPort(),
expectedHealthCheckInterval,
logConfigFile,
secretsProviderClassName,
secretsProviderConfig,
false,
null,
null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
private String javaInstanceJarFile;
private String pythonInstanceFile;
private String logDirectory;
private String extraDependenciesDir;

@VisibleForTesting
public ProcessRuntimeFactory(String pulsarServiceUrl,
Expand All @@ -50,13 +51,15 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
String javaInstanceJarFile,
String pythonInstanceFile,
String logDirectory,
String extraDependenciesDir,
SecretsProviderConfigurator secretsProviderConfigurator) {
this.pulsarServiceUrl = pulsarServiceUrl;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.authConfig = authConfig;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.javaInstanceJarFile = javaInstanceJarFile;
this.pythonInstanceFile = pythonInstanceFile;
this.extraDependenciesDir = extraDependenciesDir;
this.logDirectory = logDirectory;

// if things are not specified, try to figure out by env properties
Expand Down Expand Up @@ -92,6 +95,19 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
}
}
this.logDirectory = this.logDirectory + "/functions";

if (this.extraDependenciesDir == null) {
String envProcessContainerExtraDependenciesDir =
System.getProperty("pulsar.functions.extra.dependencies.dir");
if (null != envProcessContainerExtraDependenciesDir) {
log.info("Extra dependencies location is not defined using"
+ " the location defined in system environment : {}", envProcessContainerExtraDependenciesDir);
this.extraDependenciesDir = envProcessContainerExtraDependenciesDir;
} else {
log.info("No extra dependencies location is defined in either"
+ " function worker config or system environment");
}
}
}

@Override
Expand All @@ -112,6 +128,7 @@ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String code
return new ProcessRuntime(
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
codeFile,
pulsarServiceUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.pulsar.functions.runtime;

import com.google.protobuf.util.JsonFormat;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
Expand All @@ -28,8 +30,6 @@
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;

import java.util.*;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

Expand All @@ -39,8 +39,11 @@
@Slf4j
class RuntimeUtils {

private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";

public static List<String> composeArgs(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
String pulsarServiceUrl,
Expand All @@ -59,11 +62,19 @@ public static List<String> composeArgs(InstanceConfig instanceConfig,
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
args.add("-cp");
args.add(instanceFile);

String classpath = instanceFile;
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
classpath = classpath + ":" + extraDependenciesDir + "/*";
}
args.add(classpath);

// Keep the same env property pointing to the Java instance file so that it can be picked up
// by the child process and manually added to classpath
args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + String.format(
"%s/%s",
Expand All @@ -83,6 +94,10 @@ public static List<String> composeArgs(InstanceConfig instanceConfig,
args.add("--jar");
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
// add `extraDependenciesDir` to python package searching path
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
}
args.add("python");
args.add(instanceFile);
args.add("--py");
Expand Down
Loading

0 comments on commit 150f2a0

Please sign in to comment.