Skip to content

Commit

Permalink
Support different docker images in Kubernetes runtime of Pulsar Funct…
Browse files Browse the repository at this point in the history
…ions (apache#6752)

## Motivation
Provide support to configure different docker images for go, python, java function

## Modifications
Add javaFunctionDockerImageName, pythonFunctionDockerImageName, and goFunctionDockerImageName in KubernetesRuntimeFactoryConfig.
Add unit test case
Update docs of functions-runtime.md
  • Loading branch information
wolfstudy authored Jul 17, 2020
1 parent 742fc5c commit 3c5a423
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 2 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ functionRuntimeFactoryConfigs:
# jobNamespace:
# # the docker image to run function instance. by default it is `apachepulsar/pulsar`
# pulsarDockerImageName:
# # the docker image to run function instance according to different configurations provided by users.
# # By default it is `apachepulsar/pulsar`.
# # e.g:
# # functionDockerImages:
# # JAVA: JAVA_IMAGE_NAME
# # PYTHON: PYTHON_IMAGE_NAME
# # GO: GO_IMAGE_NAME
# functionDockerImages:
# # the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
# # if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
# pulsarRootDir:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ public class InstanceConfig {
public String getInstanceName() {
return "" + instanceId;
}

public FunctionDetails getFunctionDetails() {
return functionDetails;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class KubernetesRuntime implements Runtime {
private InstanceConfig instanceConfig;
private final String jobNamespace;
private final Map<String, String> customLabels;
private final Map<String, String> functionDockerImages;
private final String pulsarDockerImageName;
private final String imagePullPolicy;
private final String pulsarRootDir;
Expand All @@ -156,6 +158,7 @@ public class KubernetesRuntime implements Runtime {
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
String pulsarDockerImageName,
Map<String, String> functionDockerImages,
String imagePullPolicy,
String pulsarRootDir,
InstanceConfig instanceConfig,
Expand Down Expand Up @@ -185,6 +188,7 @@ public class KubernetesRuntime implements Runtime {
this.instanceConfig = instanceConfig;
this.jobNamespace = jobNamespace;
this.customLabels = customLabels;
this.functionDockerImages = functionDockerImages;
this.pulsarDockerImageName = pulsarDockerImageName;
this.imagePullPolicy = imagePullPolicy;
this.pulsarRootDir = pulsarRootDir;
Expand Down Expand Up @@ -977,8 +981,35 @@ private List<V1Toleration> getTolerations() {
V1Container getFunctionContainer(List<String> instanceCommand, Function.Resources resource) {
final V1Container container = new V1Container().name(PULSARFUNCTIONS_CONTAINER_NAME);

// set up the container images
container.setImage(pulsarDockerImageName);
Function.FunctionDetails.Runtime runtime = instanceConfig.getFunctionDetails().getRuntime();

String imageName = null;
if (functionDockerImages != null) {
switch (runtime) {
case JAVA:
if (functionDockerImages.get("JAVA") != null) {
imageName = functionDockerImages.get("JAVA");
break;
}
case PYTHON:
if (functionDockerImages.get("PYTHON") != null) {
imageName = functionDockerImages.get("PYTHON");
break;
}
case GO:
if (functionDockerImages.get("GO") != null) {
imageName = functionDockerImages.get("GO");
break;
}
default:
imageName = pulsarDockerImageName;
break;
}
container.setImage(imageName);
} else {
container.setImage(pulsarDockerImageName);
}

container.setImagePullPolicy(imagePullPolicy);

// set up the container command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private String k8Uri;
private String jobNamespace;
private String pulsarDockerImageName;
private Map<String, String> functionDockerImages;
private String imagePullPolicy;
private String pulsarRootDir;
private String configAdminCLI;
Expand Down Expand Up @@ -143,6 +144,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
} else {
this.pulsarDockerImageName = "apachepulsar/pulsar";
}
this.functionDockerImages = factoryConfig.getFunctionDockerImages();
if (!isEmpty(factoryConfig.getImagePullPolicy())) {
this.imagePullPolicy = factoryConfig.getImagePullPolicy();
} else {
Expand Down Expand Up @@ -272,6 +274,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
pythonDependencyRepository,
pythonExtraDependencyRepository,
pulsarDockerImageName,
functionDockerImages,
imagePullPolicy,
pulsarRootDir,
instanceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class KubernetesRuntimeFactoryConfig {
)
protected String pulsarDockerImageName;

@FieldContext(
doc = "The function docker images used to run function instance according to different "
+ "configurations provided by users. By default it is `apachepulsar/pulsar`"
)
protected Map<String, String> functionDockerImages;

@FieldContext(
doc = "The image pull policy for image used to run function instance. By default it is `IfNotPresent`"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
kubernetesRuntimeFactoryConfig.setK8Uri(null);
kubernetesRuntimeFactoryConfig.setJobNamespace(null);
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
Expand Down Expand Up @@ -367,10 +368,15 @@ public void testDynamicConfigMapLoading() throws Exception {
private KubernetesRuntimeFactory getKuberentesRuntimeFactory() {
KubernetesRuntimeFactory kubernetesRuntimeFactory = new KubernetesRuntimeFactory();
WorkerConfig workerConfig = new WorkerConfig();
Map<String, String> imageNames = new HashMap<>();
imageNames.put("JAVA", "test-java-function-docker-image");
imageNames.put("PYTHON", "test-python-function-docker-image");
imageNames.put("GO", "test-go-function-docker-image");
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(imageNames);
kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int
kubernetesRuntimeFactoryConfig.setK8Uri(null);
kubernetesRuntimeFactoryConfig.setJobNamespace(null);
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
Expand Down
8 changes: 8 additions & 0 deletions site2/docs/functions-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ kubernetesContainerFactory:
jobNamespace:
# the docker image to run function instance. by default it is `apachepulsar/pulsar`
pulsarDockerImageName:
# the docker image to run function instance according to different configurations provided by users.
# By default it is `apachepulsar/pulsar`.
# e.g:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
# if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
pulsarRootDir:
Expand Down

0 comments on commit 3c5a423

Please sign in to comment.