Skip to content

Commit

Permalink
[Issue 7742][functions] Allow kubernetes runtime to customize functio…
Browse files Browse the repository at this point in the history
…n instance class path (apache#7844)

Fixes apache#1338


### Motivation


Currently, the function worker is using the function worker's classpath to configure the function instance (runner)'s classpath. So when the broker (function worker) is using an image that is different from the function instance (runner) for kubernetes runtime, the classpath will be wrong and the function instance is not able to load the instance classes.


### Modifications

Adding an function instance class path entry into the kubernetes runtime config. And construct the function launch command accordingly.


### Verifying this change

- [X] Make sure that the change passes the CI checks.


This change is already covered by existing tests, such as KubernetesRuntimeTest.


### Does this pull request potentially affect one of the following parts:

  No

### Documentation

  - Does this pull request introduce a new feature? No


Co-authored-by: Yong Zhang <[email protected]>
  • Loading branch information
nlu90 and zymap authored Oct 29, 2020
1 parent 4c7f83b commit 7285380
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
3 changes: 3 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ functionRuntimeFactoryConfigs:
# # The ratio memory request and memory limit to be set for a function/source/sink.
# # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
# memoryOverCommitRatio: 1.0
# # The function instance class path to be set if it's different from the
# # broker/function-worker provided class path
# functionInstanceClassPath:

## A set of the minimum amount of resources functions must request.
## Support for this depends on function runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort,
String narExtractionDirectory) throws Exception {
String narExtractionDirectory,
String functionInstanceClassPath) throws Exception {

final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);

Expand All @@ -80,7 +81,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, metricsPort, narExtractionDirectory));
pythonExtraDependencyRepository, metricsPort, narExtractionDirectory, functionInstanceClassPath));
return cmd;
}

Expand Down Expand Up @@ -250,7 +251,8 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort,
String narExtractionDirectory) throws Exception {
String narExtractionDirectory,
String functionInstanceClassPath) throws Exception {
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
Expand All @@ -272,15 +274,18 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}

// add complete classpath for broker/worker so that the function instance can load
// the functions instance dependencies separately from user code dependencies
String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
if (functionInstanceClasspath == null) {
log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH);
functionInstanceClasspath = System.getProperty("java.class.path");
if (StringUtils.isNotEmpty(functionInstanceClassPath)) {
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath));
} else {
// add complete classpath for broker/worker so that the function instance can load
// the functions instance dependencies separately from user code dependencies
String systemFunctionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
if (systemFunctionInstanceClasspath == null) {
log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH);
systemFunctionInstanceClasspath = System.getProperty("java.class.path");
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath));

args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime {
private Integer metricsPort;
private String narExtractionDirectory;
private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
private String functionInstanceClassPath;

KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
Expand Down Expand Up @@ -182,7 +183,8 @@ public class KubernetesRuntime implements Runtime {
Integer grpcPort,
Integer metricsPort,
String narExtractionDirectory,
Optional<KubernetesManifestCustomizer> manifestCustomizer) throws Exception {
Optional<KubernetesManifestCustomizer> manifestCustomizer,
String functinoInstanceClassPath) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
Expand All @@ -202,6 +204,7 @@ public class KubernetesRuntime implements Runtime {
this.memoryOverCommitRatio = memoryOverCommitRatio;
this.authenticationEnabled = authenticationEnabled;
this.manifestCustomizer = manifestCustomizer;
this.functionInstanceClassPath = functinoInstanceClassPath;
String logConfigFile = null;
String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
String secretsProviderConfig = null;
Expand Down Expand Up @@ -253,7 +256,8 @@ public class KubernetesRuntime implements Runtime {
pythonDependencyRepository,
pythonExtraDependencyRepository,
metricsPort,
narExtractionDirectory));
narExtractionDirectory,
functinoInstanceClassPath));

doChecks(instanceConfig.getFunctionDetails());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private Integer grpcPort;
private Integer metricsPort;
private String narExtractionDirectory;
private String functionInstanceClassPath;

@ToString.Exclude
@EqualsAndHashCode.Exclude
Expand Down Expand Up @@ -233,6 +234,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
this.grpcPort = factoryConfig.getGrpcPort();
this.metricsPort = factoryConfig.getMetricsPort();
this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory();
this.functionInstanceClassPath = factoryConfig.getFunctionInstanceClassPath();
}

@Override
Expand Down Expand Up @@ -298,7 +300,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
grpcPort,
metricsPort,
narExtractionDirectory,
manifestCustomizer);
manifestCustomizer,
functionInstanceClassPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,9 @@ public class KubernetesRuntimeFactoryConfig {
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;

@FieldContext(
doc = "The classpath where function instance files stored"
)
private String functionInstanceClassPath = "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ class ProcessRuntime implements Runtime {
false,
null,
null,
this.metricsPort, narExtractionDirectory);
this.metricsPort,
narExtractionDirectory,
null);
}

/**
Expand Down

0 comments on commit 7285380

Please sign in to comment.