Skip to content

Commit

Permalink
Improve function package download process (apache#268)
Browse files Browse the repository at this point in the history
- downloader is writing the package to a temp location
- after a package is downloaded, a hardlink is created to link to the temp file. hardlink will ensure one createLink succeed.
- after the hardlink is created, the temp files can be deleted.

After this change, it allows concurrent downloading without interleaving with each other.
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 322f3af commit c350e1d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void run() {
}

private void loadJars() throws Exception {
log.info("Loading JAR files for function {}", instanceConfig);
log.info("Loading JAR files for function {} from jarFile {}", instanceConfig, jarFile);
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ public boolean isAlive() {

@Override
public Exception getDeathException() {
if (isAlive()) return null;
else return startupException;
if (isAlive()) {
return null;
} else if (null != startupException) {
return startupException;
} else if (null != javaInstanceRunnable){
return javaInstanceRunnable.getFailureException();
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.functions.worker;

import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -118,23 +121,55 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
File.separatorChar));
pkgDir.mkdirs();

File pkgFile = new File(pkgDir, new File(FunctionConfigUtils.getDownloadFileName(functionMetaData.getFunctionConfig())).getName());
if (pkgFile.exists()) {
pkgFile.delete();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();

File pkgFile = new File(
pkgDir,
new File(FunctionConfigUtils.getDownloadFileName(functionMetaData.getFunctionConfig())).getName());

if (!pkgFile.exists()) {
// download only when the package file doesn't exist
File tempPkgFile;
while (true) {
tempPkgFile = new File(
pkgDir,
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString());
if (!tempPkgFile.exists() && tempPkgFile.createNewFile()) {
break;
}
}
try {
log.info("Function package file {} will be downloaded from {}",
tempPkgFile, functionMetaData.getPackageLocation());
Utils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(tempPkgFile),
functionMetaData.getPackageLocation().getPackagePath());

// create a hardlink, if there are two concurrent createLink operations, one will fail.
// this ensures one instance will successfully download the package.
try {
Files.createLink(
Paths.get(pkgFile.toURI()),
Paths.get(tempPkgFile.toURI()));
log.info("Function package file is linked from {} to {}",
tempPkgFile, pkgFile);
} catch (FileAlreadyExistsException faee) {
// file already exists
log.warn("Function package has been downloaded from {} and saved at {}",
functionMetaData.getPackageLocation(), pkgFile);
}
} finally {
tempPkgFile.delete();
}
}
log.info("Function package file {} will be downloaded from {}",
pkgFile, functionMetaData.getPackageLocation());
Utils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(pkgFile),
functionMetaData.getPackageLocation().getPackagePath());

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionConfig(functionMetaData.getFunctionConfig());
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(functionRuntimeInfo.getFunctionInstance().getInstanceId()));
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory,
metricsSink, metricsCollectionInterval);
Expand Down

0 comments on commit c350e1d

Please sign in to comment.