Skip to content

Commit

Permalink
[Function] Integrate package management service to pulsar functionWor…
Browse files Browse the repository at this point in the history
…ker (apache#14450)

* Function Worrkers support packageManagement service

* Fixed merge issue

Co-authored-by: gavingaozhangmin <[email protected]>
Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
3 people authored May 6, 2022
1 parent ac6bd3c commit cfbc9ea
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 44 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,9 @@ exposeBundlesMetricsInPrometheus=false
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false

#Enable Functions Worker to use packageManagement Service to store package.
functionsWorkerEnablePackageManagement=false

### --- Broker Web Stats --- ###

# Enable topic level metrics
Expand Down
2 changes: 2 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ metadataStoreCacheExpirySeconds: 300

numFunctionPackageReplicas: 1
downloadDirectory: download/pulsar_functions
#Enable Functions Worker to use packageManagement Service to store package.
functionsWorkerEnablePackageManagement: false
# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
# jvmGCMetricsLoggerClassName:

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,9 @@ packagesReplicas=1
# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledgers

#Enable Functions Worker to use packageManagement Service to store package.
functionsWorkerEnablePackageManagement=false

### --- Packages management service configuration variables (end) --- ###

### --- Deprecated settings --- ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String functionsWorkerServiceNarPackage = "";

@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service."
)
private boolean functionsWorkerEnablePackageManagement = false;

/**** --- Broker Web Stats. --- ****/
@FieldContext(
category = CATEGORY_METRICS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu

// inherit super users
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
workerConfig.setFunctionsWorkerEnablePackageManagement(brokerConfig.isFunctionsWorkerEnablePackageManagement());

// inherit the nar package locations
if (isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ class UploadFunction extends BaseCommand {
protected String sourceFile;
@Parameter(
names = "--path",
description = "Path where the contents need to be stored",
description = "Path or functionPkgUrl where the contents need to be stored",
listConverter = StringConverter.class, required = true)
protected String path;

Expand Down Expand Up @@ -1138,7 +1138,7 @@ class DownloadFunction extends FunctionCommand {
protected String destinationFile;
@Parameter(
names = "--path",
description = "Path to store the content",
description = "Path or functionPkgUrl to store the content",
listConverter = StringConverter.class, required = false, hidden = true)
protected String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The number of replicas for storing functions"
)
private int numFunctionPackageReplicas;
@FieldContext(
category = CATEGORY_FUNC_PKG,
doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service."
)
private boolean functionsWorkerEnablePackageManagement = false;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The directory to download functions by runtime manager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,15 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
File pkgDir = pkgFile.getParentFile();

if (pkgFile.exists()) {
log.warn("Function package exists already {} deleting it",
pkgFile);
log.warn("Function package exists already {} deleting it", pkgFile);
pkgFile.delete();
}

File tempPkgFile;
do {
tempPkgFile = new File(
pkgDir,
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString());
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID());
} while (tempPkgFile.exists() || !tempPkgFile.createNewFile());
String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.FunctionCommon.createPkgTempFile;
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.downloadPackageFile;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand Down Expand Up @@ -105,6 +108,8 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.service.api.Component;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

@Slf4j
Expand Down Expand Up @@ -293,6 +298,14 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder();
boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
boolean isPackageManagementEnabled = worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement();
PackageName packageName = PackageName.get(functionDetails.getComponentType().name(),
tenant,
namespace,
componentName,
String.valueOf(functionMetaData.getVersion()));
PackageMetadata metadata = PackageMetadata.builder()
.createTime(functionMetaData.getCreateTime()).build();
if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
// For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk
// if the function worker image does not include connectors
Expand All @@ -309,42 +322,69 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), sinkOrSource.getAbsolutePath());
} else {
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
sinkOrSource, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource,
worker().getDlogNamespace());
} else {
log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType));
packageLocationMetaDataBuilder
.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
}
} else if (isPkgUrlProvided) {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
uploadedInputStreamAsFile.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
if (!Utils.hasPackageTypePrefix(functionPkgUrl)) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
}
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
uploadedInputStreamAsFile.getName()));
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
} else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)
|| functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
String fileName =
new File(new URL(functionMetaData.getPackageLocation().getPackagePath()).toURI()).getName();
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileName));
packageLocationMetaDataBuilder.setOriginalFileName(fileName);
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileName));
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileDetail.getFileName()));
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
} else {
// For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk
Expand All @@ -356,13 +396,19 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
|| functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
packageLocationMetaDataBuilder.setPackagePath(functionMetaData.getPackageLocation().getPackagePath());
} else {
packageLocationMetaDataBuilder
.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileDetail.getFileName()));
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
uploadedInputStreamAsFile, worker().getDlogNamespace());
}
}
return packageLocationMetaDataBuilder;
Expand Down Expand Up @@ -449,13 +495,23 @@ public void deregisterFunction(final String tenant,
if (!functionPackagePath.startsWith(Utils.HTTP)
&& !functionPackagePath.startsWith(Utils.FILE)
&& !functionPackagePath.startsWith(Utils.BUILTIN)) {
try {
WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(),
functionMetaData.getPackageLocation().getPackagePath());
} catch (IOException e) {
log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
functionMetaData.getPackageLocation().getPackagePath(), e);
if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
try {
worker().getBrokerAdmin().packages().delete(functionPackagePath);
} catch (PulsarAdminException e) {
log.error("{}/{}/{} Failed to cleanup package in package managemanet with url {}", tenant,
namespace, componentName, functionMetaData.getPackageLocation().getPackagePath(), e);
}
} else {
try {
WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(),
functionMetaData.getPackageLocation().getPackagePath());
} catch (IOException e) {
log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
functionMetaData.getPackageLocation().getPackagePath(), e);
}
}

}

deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
Expand Down Expand Up @@ -1336,8 +1392,17 @@ public void uploadFunction(final InputStream uploadedInputStream, final String p
// Upload to bookkeeper
try {
log.info("Uploading function package to {}", path);
WorkerUtils.uploadToBookKeeper(worker().getDlogNamespace(), uploadedInputStream, path);
} catch (IOException e) {
if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
File tempFile = createPkgTempFile();
tempFile.deleteOnExit();
FileOutputStream out = new FileOutputStream(tempFile);
IOUtils.copy(uploadedInputStream, out);
PackageMetadata metadata = PackageMetadata.builder().createTime(System.currentTimeMillis()).build();
worker().getBrokerAdmin().packages().upload(metadata, path, tempFile.getAbsolutePath());
} else {
WorkerUtils.uploadToBookKeeper(worker().getDlogNamespace(), uploadedInputStream, path);
}
} catch (IOException | PulsarAdminException e) {
log.error("Error uploading file {}", path, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
Expand Down Expand Up @@ -1402,6 +1467,17 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
IOUtils.copy(in, output, 1024);
output.flush();
}
} else if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
try {
File file = downloadPackageFile(worker(), pkgPath);
try (InputStream in = new FileInputStream(file)) {
IOUtils.copy(in, output, 1024);
output.flush();
}
} catch (Exception e) {
log.error("Failed download package {} from packageMangment Service", pkgPath, e);

}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
Expand Down
Loading

0 comments on commit cfbc9ea

Please sign in to comment.