diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index e12cb6b2ec62b..0693452b98873 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -1087,32 +1087,35 @@ private List getPrometheusContainerPorts() { public static String createJobName(Function.FunctionDetails functionDetails, String jobName) { return jobName == null ? createJobName(functionDetails.getTenant(), - functionDetails.getNamespace(), - functionDetails.getName()) : createJobName(jobName); + functionDetails.getNamespace(), functionDetails.getName()) : + createJobName(jobName, functionDetails.getTenant(), + functionDetails.getNamespace(), functionDetails.getName()); } private static String toValidPodName(String ori) { return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-"); } - private static String validateName(String jobName) { + private static String createJobName(String jobName, String tenant, String namespace, String functionName) { final String convertedJobName = toValidPodName(jobName); + // use of customRuntimeOptions 'jobName' may cause naming collisions, + // add a short hash here to avoid it + final String hashName = String.format("%s-%s-%s-%s", jobName, tenant, namespace, functionName); + final String shortHash = DigestUtils.sha1Hex(hashName).toLowerCase().substring(0, 8); + return convertedJobName + "-" + shortHash; + } + + private static String createJobName(String tenant, String namespace, String functionName) { + final String jobNameBase = String.format("%s-%s-%s", tenant, namespace, functionName); + final String jobName = "pf-" + jobNameBase; + final String convertedJobName = toValidPodName(jobName); if (jobName.equals(convertedJobName)) { return jobName; } // toValidPodName may cause naming collisions, add a short hash here to avoid it - final String shortHash = DigestUtils.sha1Hex(jobName.replaceFirst("pf-", "")).toLowerCase().substring(0, 8); + final String shortHash = DigestUtils.sha1Hex(jobNameBase).toLowerCase().substring(0, 8); return convertedJobName + "-" + shortHash; } - - private static String createJobName(String jobName) { - return validateName(jobName); - } - - private static String createJobName(String tenant, String namespace, String functionName) { - final String jobName = "pf-" + String.format("%s-%s-%s", tenant, namespace, functionName); - return validateName(jobName); - } private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) { return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, instanceId, jobName, jobNamespace); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index a06ba8999fcde..a34ef8f31abcb 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -58,6 +58,7 @@ import static org.powermock.api.mockito.PowerMockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertThrows; /** * Unit test of {@link ThreadRuntime}. @@ -503,8 +504,10 @@ public void testCreateJobName() throws Exception { verifyCreateJobNameWithInvalidMarksFunctionName(); verifyCreateJobNameWithCollisionalFunctionName(); verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName(); + verifyCreateJobNameWithOverriddenK8sPodNameNoCollisionWithSameName(); verifyCreateJobNameWithOverriddenK8sPodName(); verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks(); + verifyCreateJobNameWithNameOverMaxCharLimit(); } FunctionDetails createFunctionDetails(final String functionName) { @@ -575,16 +578,54 @@ private void verifyCreateJobNameWithInvalidMarksFunctionName() throws Exception private void verifyCreateJobNameWithOverriddenK8sPodName() throws Exception { final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction"); final String jobName = KubernetesRuntime.createJobName(functionDetails, "custom-k8s-pod-name"); - assertEquals(jobName, "custom-k8s-pod-name"); + assertEquals(jobName, "custom-k8s-pod-name-dedfc7cf"); KubernetesRuntime.doChecks(functionDetails, "custom-k8s-pod-name"); } private void verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks() throws Exception { final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction"); final String jobName = KubernetesRuntime.createJobName(functionDetails, "invalid_pod*name"); - assertEquals(jobName, "invalid-pod-name-04d0e74a"); + assertEquals(jobName, "invalid-pod-name-af8c3a6c"); KubernetesRuntime.doChecks(functionDetails, "invalid_pod*name"); } + + private void verifyCreateJobNameWithOverriddenK8sPodNameNoCollisionWithSameName() throws Exception { + final String CUSTOM_JOB_NAME = "custom-name"; + final String FUNCTION_NAME = "clazz.testfunction"; + + final FunctionDetails functionDetails1 = createFunctionDetails(FUNCTION_NAME); + final String jobName1 = KubernetesRuntime.createJobName(functionDetails1, CUSTOM_JOB_NAME); + + // create a second function with the same name, but in different tenant/namespace to make sure collision does not + // happen. If tenant, namespace, and function name are the same kubernetes handles collision issues + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setTenant("tenantA"); + functionDetailsBuilder.setNamespace("nsA"); + functionDetailsBuilder.setName(FUNCTION_NAME); + final FunctionDetails functionDetails2 = functionDetailsBuilder.build(); + final String jobName2 = KubernetesRuntime.createJobName(functionDetails2, CUSTOM_JOB_NAME); + + // create a third function with different name but in same tenant/namespace to make sure + // collision does not happen. If tenant, namespace, and function name are the same kubernetes handles collision issues + final FunctionDetails functionDetails3 = createFunctionDetails(FUNCTION_NAME + "-extra"); + final String jobName3 = KubernetesRuntime.createJobName(functionDetails3, CUSTOM_JOB_NAME); + + assertEquals(jobName1, CUSTOM_JOB_NAME + "-85ac54b0"); + KubernetesRuntime.doChecks(functionDetails1, CUSTOM_JOB_NAME); + + assertEquals(jobName2, CUSTOM_JOB_NAME + "-c66edfe1"); + KubernetesRuntime.doChecks(functionDetails2, CUSTOM_JOB_NAME); + + assertEquals(jobName3, CUSTOM_JOB_NAME + "-0fc9c728"); + KubernetesRuntime.doChecks(functionDetails3, CUSTOM_JOB_NAME); + } + + private void verifyCreateJobNameWithNameOverMaxCharLimit() throws Exception { + final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction"); + assertThrows(RuntimeException.class, () -> KubernetesRuntime.doChecks(functionDetails, + "custom-k8s-pod-name-over-kuberenetes-max-character-limit-123456789")); + } private void verifyCreateJobNameWithCollisionalFunctionName() throws Exception { final FunctionDetails functionDetail1 = createFunctionDetails("testfunction"); @@ -687,7 +728,7 @@ public void testBasicKubernetesManifestCustomizer() throws Exception { V1Service serviceSpec = container.createService(); assertEquals(serviceSpec.getMetadata().getNamespace(), "custom-ns"); - assertEquals(serviceSpec.getMetadata().getName(), "custom-name"); + assertEquals(serviceSpec.getMetadata().getName(), "custom-name-2deb2c2b"); assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), "test"); assertEquals(serviceSpec.getMetadata().getLabels().get("label"), "test");