Skip to content

Commit

Permalink
[fix apache#9315] Add downloadDirectory support to function k8s runti…
Browse files Browse the repository at this point in the history
…me (apache#9377)

Fixes apache#9315

### Motivation

k8s runtime not using `downloadDirectory` defined from `functions_worker.yml`, so user cannot self define the download directory for k8s runtime. 

### Modifications

- add `downloadDirectory` from `WorkerConfig`
- add backward compatibles
- add tests

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
freeznet authored Feb 9, 2021
1 parent 28b2094 commit 3af98bc
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,9 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar
try {
File file = new File(destinationPath);
if (!file.exists()) {
if (file.getParentFile() != null && !file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
file.createNewFile();
}
FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public void completed(Response response) {
InputStream inputStream = response.readEntity(InputStream.class);
Path destinyPath = Paths.get(path);
try {
if (destinyPath.getParent() != null) {
Files.createDirectories(destinyPath.getParent());
}
Files.copy(inputStream, destinyPath);
future.complete(null);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand All @@ -71,6 +72,7 @@
import org.apache.pulsar.packages.management.core.common.PackageType;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -152,6 +154,7 @@ public class KubernetesRuntime implements Runtime {
private String narExtractionDirectory;
private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
private String functionInstanceClassPath;
private String downloadDirectory;

KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
Expand Down Expand Up @@ -187,7 +190,8 @@ public class KubernetesRuntime implements Runtime {
Integer metricsPort,
String narExtractionDirectory,
Optional<KubernetesManifestCustomizer> manifestCustomizer,
String functinoInstanceClassPath) throws Exception {
String functinoInstanceClassPath,
String downloadDirectory) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
Expand All @@ -200,7 +204,8 @@ public class KubernetesRuntime implements Runtime {
this.pulsarRootDir = pulsarRootDir;
this.configAdminCLI = configAdminCLI;
this.userCodePkgUrl = userCodePkgUrl;
this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
this.downloadDirectory = StringUtils.isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
this.originalCodeFileName = this.downloadDirectory + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.percentMemoryPadding = percentMemoryPadding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.TimerTask;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;

/**
Expand Down Expand Up @@ -98,6 +99,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private Integer metricsPort;
private String narExtractionDirectory;
private String functionInstanceClassPath;
private String downloadDirectory;

@ToString.Exclude
@EqualsAndHashCode.Exclude
Expand Down Expand Up @@ -169,6 +171,11 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
} else {
this.configAdminCLI = "/bin/pulsar-admin";
}
this.downloadDirectory = isNotEmpty(workerConfig.getDownloadDirectory()) ?
workerConfig.getDownloadDirectory() : this.pulsarRootDir; // for backward comp
if (!Paths.get(this.downloadDirectory).isAbsolute()) {
this.downloadDirectory = this.pulsarRootDir + "/" + this.downloadDirectory;
}

this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies();
Expand Down Expand Up @@ -311,7 +318,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
metricsPort,
narExtractionDirectory,
manifestCustomizer,
functionInstanceClassPath);
functionInstanceClassPath,
downloadDirectory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ public void tearDown() {

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
Optional<RuntimeCustomizer> manifestCustomizer,
String downloadDirectory) throws Exception {

KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
doNothing().when(factory).setupClient();

Expand Down Expand Up @@ -218,13 +220,20 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int
workerConfig.setFunctionInstanceMinResources(null);
workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
workerConfig.setAuthenticationEnabled(false);
workerConfig.setDownloadDirectory(downloadDirectory);

manifestCustomizer.ifPresent(runtimeCustomizer -> runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));

factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), manifestCustomizer);
return factory;
}

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer, null);
}

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, Optional.empty());
Expand Down Expand Up @@ -364,6 +373,75 @@ public void testResources(double userCpuRequest, long userMemoryRequest, double
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), roundDecimal(resources.getCpu(), 3));
}

private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();

String classpath = javaInstanceJarFile;
String extraDepsEnv;
String jarLocation;
int portArg;
int metricsPortArg;
int totalArgs;
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
totalArgs = 37;
portArg = 26;
metricsPortArg = 28;
} else {
extraDepsEnv = "";
portArg = 25;
metricsPortArg = 27;
totalArgs = 36;
}
if (secretsAttached) {
totalArgs += 4;
}
if (StringUtils.isNotEmpty(downloadDirectory)){
jarLocation = downloadDirectory + "/" + userJarFile;
} else {
jarLocation = pulsarRootDir + "/" + userJarFile;
}

assertEquals(args.size(), totalArgs,
"Actual args : " + StringUtils.join(args, " "));

String expectedArgs = "exec java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Xmx" + String.valueOf(RESOURCES.getRam())
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --metrics_port " + args.get(metricsPortArg)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'";
}
expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;

assertEquals(String.join(" ", args), expectedArgs);

// check padding and xmx
long heap = Long.parseLong(args.stream().filter(s -> s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
assertEquals(heap, RESOURCES.getRam());
assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(), Math.round(heap + (heap * 0.1)));

// check cpu
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
}

private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
Expand Down Expand Up @@ -1043,4 +1121,52 @@ public void testBasicKubernetesManifestCustomizerWithRuntimeCustomizerConfigOver
});

}

@Test
public void testJavaConstructorWithoutDownloadDirectoryDefined() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), null);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
}

@Test
public void testJavaConstructorWithDownloadDirectoryDefined() throws Exception {
String downloadDirectory = "download/pulsar_functions";
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
}

@Test
public void testJavaConstructorWithAbsolutDownloadDirectoryDefined() throws Exception {
String downloadDirectory = "/functions/download/pulsar_functions";
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
}

@Test
public void testCustomKubernetesDownloadCommandsWithDownloadDirectoryDefined() throws Exception {
String downloadDirectory = "download/pulsar_functions";
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
return fb.setPackageUrl("function://public/default/test@v1");
}));

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
V1StatefulSet spec = container.createStatefulSet();
String expectedDownloadCommand = "pulsar-admin --admin-url http://localhost:8080 packages download "
+ "function://public/default/test@v1 --path " + factory.getDownloadDirectory() + "/" + userJarFile;
String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
assertTrue(containerCommand.contains(expectedDownloadCommand));
}
}

0 comments on commit 3af98bc

Please sign in to comment.