Skip to content

Commit

Permalink
[Issue 8012][pulsar_functions] For the Kubernetes function worker run…
Browse files Browse the repository at this point in the history
…time, allow customization of the pulsar function "jobName" (apache#8452)

Fixes apache#8012 

### Motivation

Currently (as of 2.6.x), the Pulsar KubernetesRuntime class hardcodes the jobName (the name assigned to the StatefulSet used to create the function pods) to the format "pf-[tenant]-[namespace]-[function][-optional 8 char hash]." While the intent of this name format was no doubt both to provide a human readable name for the k8s objects and ensure uniqueness within k8s, we've found it -- when combined with the 55 character size restriction imposed by KubernetesRuntime -- to be unnecessarily limiting. In our environment, we ensure that Pulsar functions under a particular Pulsar tenant deploy into a kubernetes namespace dedicated to that tenant; hence, for us the [tenant] portion of the function name is redundant. Further, the "pf-" prefix is unnecessary, as we're able to distinguish the function pods from other pods based on the function name alone. These issues may seem minor, but they consume precious characters against the 55 character max!
  • Loading branch information
jdbeck authored Nov 7, 2020
1 parent de7da89 commit db6afd5
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
@NoArgsConstructor
static private class RuntimeOpts {
private String jobNamespace;
private String jobName;
private Map<String, String> extraLabels;
private Map<String, String> extraAnnotations;
private Map<String, String> nodeSelectorLabels;
Expand All @@ -70,7 +71,17 @@ public String customizeNamespace(Function.FunctionDetails funcDetails, String cu
return currentNamespace;
}
}


@Override
public String customizeName(Function.FunctionDetails funcDetails, String currentName) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
if (!StringUtils.isEmpty(opts.getJobName())) {
return opts.getJobName();
} else {
return currentName;
}
}

@Override
public V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ default V1Service customizeService(Function.FunctionDetails funcDetails, V1Servi
default String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) {
return currentNamespace;
}

default String customizeName(Function.FunctionDetails funcDetails, String currentName) {
return currentName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class KubernetesRuntime implements Runtime {
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
private InstanceConfig instanceConfig;
private final String jobNamespace;
private final String jobName;
private final Map<String, String> customLabels;
private final Map<String, String> functionDockerImages;
private final String pulsarDockerImageName;
Expand All @@ -154,6 +155,7 @@ public class KubernetesRuntime implements Runtime {
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
String jobNamespace,
String jobName,
Map<String, String> customLabels,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
Expand Down Expand Up @@ -189,6 +191,7 @@ public class KubernetesRuntime implements Runtime {
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
this.jobNamespace = jobNamespace;
this.jobName = jobName;
this.customLabels = customLabels;
this.functionDockerImages = functionDockerImages;
this.pulsarDockerImageName = pulsarDockerImageName;
Expand Down Expand Up @@ -259,7 +262,7 @@ public class KubernetesRuntime implements Runtime {
narExtractionDirectory,
functinoInstanceClassPath));

doChecks(instanceConfig.getFunctionDetails());
doChecks(instanceConfig.getFunctionDetails(), this.jobName);
}

/**
Expand Down Expand Up @@ -294,7 +297,7 @@ private synchronized void setupGrpcChannelIfNeeded() {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];

String jobName = createJobName(instanceConfig.getFunctionDetails());
String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
String address = getServiceUrl(jobName, jobNamespace, i);
channel[i] = ManagedChannelBuilder.forAddress(address, grpcPort)
Expand Down Expand Up @@ -460,7 +463,7 @@ private void submitService() throws Exception {

@VisibleForTesting
V1Service createService() {
final String jobName = createJobName(instanceConfig.getFunctionDetails());
final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);

final V1Service service = new V1Service();

Expand Down Expand Up @@ -545,7 +548,7 @@ private void submitStatefulSet() throws Exception {


public void deleteStatefulSet() throws InterruptedException {
String statefulSetName = createJobName(instanceConfig.getFunctionDetails());
String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1DeleteOptions options = new V1DeleteOptions();
options.setGracePeriodSeconds(5L);
options.setPropagationPolicy("Foreground");
Expand Down Expand Up @@ -700,7 +703,7 @@ public void deleteService() throws InterruptedException {
options.setGracePeriodSeconds(0L);
options.setPropagationPolicy("Foreground");
String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);

Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
Expand Down Expand Up @@ -861,7 +864,7 @@ private static String setShardIdEnvironmentVariableCommand() {

@VisibleForTesting
V1StatefulSet createStatefulSet() {
final String jobName = createJobName(instanceConfig.getFunctionDetails());
final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);

final V1StatefulSet statefulSet = new V1StatefulSet();

Expand Down Expand Up @@ -1082,40 +1085,48 @@ private List<V1ContainerPort> getPrometheusContainerPorts() {
return ports;
}

public static String createJobName(Function.FunctionDetails functionDetails) {
return createJobName(functionDetails.getTenant(),
public static String createJobName(Function.FunctionDetails functionDetails, String jobName) {
return jobName == null ? createJobName(functionDetails.getTenant(),
functionDetails.getNamespace(),
functionDetails.getName());
functionDetails.getName()) : createJobName(jobName);
}

private static String toValidPodName(String ori) {
return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
}

private static String createJobName(String tenant, String namespace, String functionName) {
final String jobNameContent = String.format("%s-%s-%s", tenant, namespace,functionName);
final String jobName = "pf-" + jobNameContent;
final String convertedJobName = toValidPodName(jobName);

private static String validateName(String jobName) {
final String convertedJobName = toValidPodName(jobName);
if (jobName.equals(convertedJobName)) {
return jobName;
}
// toValidPodName may cause naming collisions, add a short hash here to avoid it
final String shortHash = DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
final String shortHash = DigestUtils.sha1Hex(jobName.replaceFirst("pf-", "")).toLowerCase().substring(0, 8);
return convertedJobName + "-" + shortHash;
}

private static String createJobName(String jobName) {
return validateName(jobName);
}

private static String createJobName(String tenant, String namespace, String functionName) {
final String jobName = "pf-" + String.format("%s-%s-%s", tenant, namespace, functionName);
return validateName(jobName);
}

private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, instanceId, jobName, jobNamespace);
}

public static void doChecks(Function.FunctionDetails functionDetails) {
final String jobName = createJobName(functionDetails);
public static void doChecks(Function.FunctionDetails functionDetails, String overridenJobName) {
final String jobName = createJobName(functionDetails, overridenJobName);
if (!jobName.equals(jobName.toLowerCase())) {
throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
}
final Matcher matcher = VALID_POD_NAME_REGEX.matcher(jobName);
if (!matcher.matches()) {
throw new RuntimeException("Kubernetes only admits lower case and numbers.");
throw new RuntimeException("Kubernetes only admits lower case and numbers. " +
"(jobName=" + jobName + ")");
}
if (jobName.length() > maxJobNameSize) {
throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {

private String k8Uri;
private String jobNamespace;
private String jobName;
private String pulsarDockerImageName;
private Map<String, String> functionDockerImages;
private String imagePullPolicy;
Expand Down Expand Up @@ -140,6 +141,11 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
} else {
this.jobNamespace = "default";
}
if (!isEmpty(factoryConfig.getJobName())) {
this.jobName = factoryConfig.getJobName();
} else {
this.jobName = null;
}
if (!isEmpty(factoryConfig.getPulsarDockerImageName())) {
this.pulsarDockerImageName = factoryConfig.getPulsarDockerImageName();
} else {
Expand Down Expand Up @@ -265,12 +271,14 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c

Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
String overriddenNamespace = manifestCustomizer.map((customizer) -> customizer.customizeNamespace(instanceConfig.getFunctionDetails(), jobNamespace)).orElse(jobNamespace);
String overriddenName = manifestCustomizer.map((customizer) -> customizer.customizeName(instanceConfig.getFunctionDetails(), jobName)).orElse(jobName);

return new KubernetesRuntime(
appsClient,
coreClient,
// get the namespace for this function
overriddenNamespace,
overriddenName,
customLabels,
installUserCodeDependencies,
pythonDependencyRepository,
Expand Down Expand Up @@ -310,9 +318,11 @@ public void close() {

@Override
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails);
final String overriddenJobName = getOverriddenName(functionDetails);
KubernetesRuntime.doChecks(functionDetails, overriddenJobName);
validateMinResourcesRequired(functionDetails);
secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, getOverriddenNamespace(functionDetails), functionDetails);
secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient,
getOverriddenNamespace(functionDetails), overriddenJobName, functionDetails);
}

@VisibleForTesting
Expand Down Expand Up @@ -420,4 +430,9 @@ private String getOverriddenNamespace(Function.FunctionDetails funcDetails) {
Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
return manifestCustomizer.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace)).orElse(jobNamespace);
}

private String getOverriddenName(Function.FunctionDetails funcDetails) {
Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
return manifestCustomizer.map((customizer) -> customizer.customizeName(funcDetails, jobName)).orElse(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class KubernetesRuntimeFactoryConfig {
+ " if this setting is left to be empty"
)
protected String jobNamespace;
@FieldContext(
doc = "The Kubernetes pod name to run the function instances. It is set to"
+ "`pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty"
)
protected String jobName;
@FieldContext(
doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Type getSecretObjectType() {
}

@Override
public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {

}
}
Expand Down Expand Up @@ -152,6 +152,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri(null);
kubernetesRuntimeFactoryConfig.setJobNamespace(null);
kubernetesRuntimeFactoryConfig.setJobName(null);
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
Expand Down Expand Up @@ -375,6 +376,7 @@ private KubernetesRuntimeFactory getKuberentesRuntimeFactory() {
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
kubernetesRuntimeFactoryConfig.setJobName("test_jobName");
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(imageNames);
kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
Expand Down
Loading

0 comments on commit db6afd5

Please sign in to comment.