Skip to content

Commit

Permalink
[cleanup][functions] Remove unused code (apache#16472)
Browse files Browse the repository at this point in the history
packageUrl is not set anymore in FunctionDetails so code that depend on it can be removed.
In the case of KubernetesRuntime we currently download the function itself and not the package.
This change doesn't modify this behaviour.
  • Loading branch information
cbornet authored Aug 16, 2022
1 parent 56c41f8 commit fe2eb6a
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.packages.management.core.common.PackageType;

/**
* Kubernetes based runtime for running functions.
Expand Down Expand Up @@ -855,13 +854,8 @@ protected List<String> getExecutorCommand() {
}

private List<String> getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath) {
if (Arrays.stream(PackageType.values()).anyMatch(type ->
functionDetails.getPackageUrl().startsWith(type.toString()))) {
return getPackageDownloadCommand(functionDetails.getPackageUrl(), userCodeFilePath);
} else {
return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(),
return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), userCodeFilePath);
}
}

private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) {
Expand Down Expand Up @@ -908,39 +902,6 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())
userCodeFilePath);
}

private List<String> getPackageDownloadCommand(String packageName, String userCodeFilePath) {
// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())
&& instanceConfig.getFunctionAuthenticationSpec() != null) {
return Arrays.asList(
pulsarRootDir + configAdminCLI,
"--auth-plugin",
authConfig.getClientAuthenticationPlugin(),
"--auth-params",
authConfig.getClientAuthenticationParameters(),
"--admin-url",
pulsarAdminUrl,
"packages",
"download",
packageName,
"--path",
userCodeFilePath);
}
}

return Arrays.asList(
pulsarRootDir + configAdminCLI,
"--admin-url",
pulsarAdminUrl,
"packages",
"download",
packageName,
"--path",
userCodeFilePath);
}

private static String setShardIdEnvironmentVariableCommand() {
return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,24 +806,6 @@ public void testCustomKubernetesManifestCustomizer() throws Exception {
assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
}

@Test
public void testCustomKubernetesDownloadCommands() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
return fb.setPackageUrl("function://public/default/test@v1");
}));

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
V1StatefulSet spec = container.createStatefulSet();
String expectedDownloadCommand = "pulsar-admin --admin-url http://localhost:8080 packages download "
+ "function://public/default/test@v1 --path " + pulsarRootDir + "/" + userJarFile;
String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
assertTrue(containerCommand.contains(expectedDownloadCommand));
}

InstanceConfig createGolangInstanceConfig() {
InstanceConfig config = new InstanceConfig();

Expand Down Expand Up @@ -1133,25 +1115,6 @@ public void testJavaConstructorWithAbsolutDownloadDirectoryDefined() throws Exce
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
}

@Test
public void testCustomKubernetesDownloadCommandsWithDownloadDirectoryDefined() throws Exception {
String downloadDirectory = "download/pulsar_functions";
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
return fb.setPackageUrl("function://public/default/test@v1");
}));

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
V1StatefulSet spec = container.createStatefulSet();
String expectedDownloadCommand = "pulsar-admin --admin-url http://localhost:8080 packages download "
+ "function://public/default/test@v1 --path " + factory.getDownloadDirectory() + "/" + userJarFile;
String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
assertTrue(containerCommand.contains(expectedDownloadCommand));
}

@Test
public void shouldUseConfiguredMetricsPort() throws Exception {
assertMetricsPortConfigured(Collections.singletonMap("metricsPort", 12345), 12345);
Expand Down

0 comments on commit fe2eb6a

Please sign in to comment.