diff --git a/conf/broker.conf b/conf/broker.conf index b70da06b55fa9..e49ec50e215a3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1281,6 +1281,9 @@ exposeBundlesMetricsInPrometheus=false # Enable Functions Worker Service in Broker functionsWorkerEnabled=false +#Enable Functions Worker to use packageManagement Service to store package. +functionsWorkerEnablePackageManagement=false + ### --- Broker Web Stats --- ### # Enable topic level metrics diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 1dcb51f1cae20..4a018ceb6c228 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -49,6 +49,8 @@ metadataStoreCacheExpirySeconds: 300 numFunctionPackageReplicas: 1 downloadDirectory: download/pulsar_functions +#Enable Functions Worker to use packageManagement Service to store package. +functionsWorkerEnablePackageManagement: false # Classname of Pluggable JVM GC metrics logger that can log GC specific metrics # jvmGCMetricsLoggerClassName: diff --git a/conf/standalone.conf b/conf/standalone.conf index 9c916b18cd7de..60fc7e5b89d84 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1068,6 +1068,9 @@ packagesReplicas=1 # The bookkeeper ledger root path packagesManagementLedgerRootPath=/ledgers +#Enable Functions Worker to use packageManagement Service to store package. +functionsWorkerEnablePackageManagement=false + ### --- Packages management service configuration variables (end) --- ### ### --- Deprecated settings --- ### diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6bfacf3c198eb..f877bfa086eb9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2419,6 +2419,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String functionsWorkerServiceNarPackage = ""; + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service." + ) + private boolean functionsWorkerEnablePackageManagement = false; + /**** --- Broker Web Stats. --- ****/ @FieldContext( category = CATEGORY_METRICS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 372ca4e4250e1..b4e13f4fcf72f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1691,6 +1691,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu // inherit super users workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles()); + workerConfig.setFunctionsWorkerEnablePackageManagement(brokerConfig.isFunctionsWorkerEnablePackageManagement()); // inherit the nar package locations if (isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 97025c4193aae..ad036f6f25f7c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -1101,7 +1101,7 @@ class UploadFunction extends BaseCommand { protected String sourceFile; @Parameter( names = "--path", - description = "Path where the contents need to be stored", + description = "Path or functionPkgUrl where the contents need to be stored", listConverter = StringConverter.class, required = true) protected String path; @@ -1138,7 +1138,7 @@ class DownloadFunction extends FunctionCommand { protected String destinationFile; @Parameter( names = "--path", - description = "Path to store the content", + description = "Path or functionPkgUrl to store the content", listConverter = StringConverter.class, required = false, hidden = true) protected String path; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 47c3600fa655d..dd694b6b36d95 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -293,6 +293,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The number of replicas for storing functions" ) private int numFunctionPackageReplicas; + @FieldContext( + category = CATEGORY_FUNC_PKG, + doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service." + ) + private boolean functionsWorkerEnablePackageManagement = false; @FieldContext( category = CATEGORY_FUNC_RUNTIME_MNG, doc = "The directory to download functions by runtime manager" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 1355663536280..01b240d7f97a1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -203,8 +203,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa File pkgDir = pkgFile.getParentFile(); if (pkgFile.exists()) { - log.warn("Function package exists already {} deleting it", - pkgFile); + log.warn("Function package exists already {} deleting it", pkgFile); pkgFile.delete(); } @@ -212,7 +211,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa do { tempPkgFile = new File( pkgDir, - pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString()); + pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID()); } while (tempPkgFile.exists() || !tempPkgFile.createNewFile()); String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath(); boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index d0db047a02048..801bf52091177 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -24,15 +24,18 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.utils.FunctionCommon.createPkgTempFile; import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace; import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName; import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin; import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; +import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.downloadPackageFile; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -105,6 +108,8 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.service.api.Component; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; +import org.apache.pulsar.packages.management.core.common.PackageName; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @Slf4j @@ -293,6 +298,14 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); + boolean isPackageManagementEnabled = worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement(); + PackageName packageName = PackageName.get(functionDetails.getComponentType().name(), + tenant, + namespace, + componentName, + String.valueOf(functionMetaData.getVersion())); + PackageMetadata metadata = PackageMetadata.builder() + .createTime(functionMetaData.getCreateTime()).build(); if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { // For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk // if the function worker image does not include connectors @@ -309,42 +322,69 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, sinkOrSource.getName())); packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName()); + if (isPackageManagementEnabled) { + packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); + worker().getBrokerAdmin().packages().upload(metadata, + packageName.toString(), sinkOrSource.getAbsolutePath()); + } else { + WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), + sinkOrSource, worker().getDlogNamespace()); + } log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath()); - WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, - worker().getDlogNamespace()); } else { log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType)); packageLocationMetaDataBuilder .setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); } } else if (isPkgUrlProvided) { - packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, - uploadedInputStreamAsFile.getName())); packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName()); + if (isPackageManagementEnabled) { + packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl); + if (!Utils.hasPackageTypePrefix(functionPkgUrl)) { + packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); + worker().getBrokerAdmin().packages().upload(metadata, + packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); + } + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + uploadedInputStreamAsFile.getName())); + WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), + uploadedInputStreamAsFile, worker().getDlogNamespace()); + } log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath()); - WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), - uploadedInputStreamAsFile, worker().getDlogNamespace()); } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) || functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { String fileName = new File(new URL(functionMetaData.getPackageLocation().getPackagePath()).toURI()).getName(); - packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, - fileName)); packageLocationMetaDataBuilder.setOriginalFileName(fileName); + if (isPackageManagementEnabled) { + packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); + worker().getBrokerAdmin().packages().upload(metadata, + packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + fileName)); + WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), + uploadedInputStreamAsFile, worker().getDlogNamespace()); + } log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath()); - WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), - uploadedInputStreamAsFile, worker().getDlogNamespace()); } else { - packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, - fileDetail.getFileName())); packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); + if (isPackageManagementEnabled) { + packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); + worker().getBrokerAdmin().packages().upload(metadata, + packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + fileDetail.getFileName())); + WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), + uploadedInputStreamAsFile, worker().getDlogNamespace()); + } log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath()); - WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), - uploadedInputStreamAsFile, worker().getDlogNamespace()); } } else { // For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk @@ -356,13 +396,19 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat || functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { packageLocationMetaDataBuilder.setPackagePath(functionMetaData.getPackageLocation().getPackagePath()); } else { - packageLocationMetaDataBuilder - .setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); + if (isPackageManagementEnabled) { + packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); + worker().getBrokerAdmin().packages().upload(metadata, + packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + fileDetail.getFileName())); + WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), + uploadedInputStreamAsFile, worker().getDlogNamespace()); + } log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath()); - WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), - uploadedInputStreamAsFile, worker().getDlogNamespace()); } } return packageLocationMetaDataBuilder; @@ -449,13 +495,23 @@ public void deregisterFunction(final String tenant, if (!functionPackagePath.startsWith(Utils.HTTP) && !functionPackagePath.startsWith(Utils.FILE) && !functionPackagePath.startsWith(Utils.BUILTIN)) { - try { - WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), - functionMetaData.getPackageLocation().getPackagePath()); - } catch (IOException e) { - log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName, - functionMetaData.getPackageLocation().getPackagePath(), e); + if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + try { + worker().getBrokerAdmin().packages().delete(functionPackagePath); + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Failed to cleanup package in package managemanet with url {}", tenant, + namespace, componentName, functionMetaData.getPackageLocation().getPackagePath(), e); + } + } else { + try { + WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), + functionMetaData.getPackageLocation().getPackagePath()); + } catch (IOException e) { + log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName, + functionMetaData.getPackageLocation().getPackagePath(), e); + } } + } deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); @@ -1336,8 +1392,17 @@ public void uploadFunction(final InputStream uploadedInputStream, final String p // Upload to bookkeeper try { log.info("Uploading function package to {}", path); - WorkerUtils.uploadToBookKeeper(worker().getDlogNamespace(), uploadedInputStream, path); - } catch (IOException e) { + if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + File tempFile = createPkgTempFile(); + tempFile.deleteOnExit(); + FileOutputStream out = new FileOutputStream(tempFile); + IOUtils.copy(uploadedInputStream, out); + PackageMetadata metadata = PackageMetadata.builder().createTime(System.currentTimeMillis()).build(); + worker().getBrokerAdmin().packages().upload(metadata, path, tempFile.getAbsolutePath()); + } else { + WorkerUtils.uploadToBookKeeper(worker().getDlogNamespace(), uploadedInputStream, path); + } + } catch (IOException | PulsarAdminException e) { log.error("Error uploading file {}", path, e); throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } @@ -1402,6 +1467,17 @@ private StreamingOutput getStreamingOutput(String pkgPath) { IOUtils.copy(in, output, 1024); output.flush(); } + } else if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + try { + File file = downloadPackageFile(worker(), pkgPath); + try (InputStream in = new FileInputStream(file)) { + IOUtils.copy(in, output, 1024); + output.flush(); + } + } catch (Exception e) { + log.error("Failed download package {} from packageMangment Service", pkgPath, e); + + } } else { WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 168596d1a1119..bc14f7deb9e12 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -337,7 +337,7 @@ public void updateFunction(final String tenant, try { if (isNotBlank(functionPkgUrl)) { if (Utils.hasPackageTypePrefix(functionPkgUrl)) { - componentPackageFile = downloadPackageFile(functionName); + componentPackageFile = downloadPackageFile(functionPkgUrl); } else { try { componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl); @@ -377,12 +377,16 @@ public void updateFunction(final String tenant, + " Package is not provided"); } } else { - componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), - componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); - + if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + worker().getBrokerAdmin().packages().download( + existingComponent.getPackageLocation().getPackagePath(), + componentPackageFile.getAbsolutePath()); + } else { + WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), + componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); + } functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, mergedConfig, componentPackageFile); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 19a59396025d9..7b2c5a68ef2c9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -377,12 +377,16 @@ public void updateSink(final String tenant, ComponentTypeUtils.toString(componentType) + " Package is not provided"); } } else { - componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, - existingComponent.getPackageLocation().getPackagePath()); - + if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + worker().getBrokerAdmin().packages().download( + existingComponent.getPackageLocation().getPackagePath(), + componentPackageFile.getAbsolutePath()); + } else { + WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, + existingComponent.getPackageLocation().getPackagePath()); + } functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName, mergedConfig, componentPackageFile); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index ee3e06533c1cd..82d2043e9a9c3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -377,12 +377,16 @@ public void updateSource(final String tenant, ComponentTypeUtils.toString(componentType) + " Package is not provided"); } } else { - componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, - existingComponent.getPackageLocation().getPackagePath()); - + if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { + worker().getBrokerAdmin().packages().download( + existingComponent.getPackageLocation().getPackagePath(), + componentPackageFile.getAbsolutePath()); + } else { + WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, + existingComponent.getPackageLocation().getPackagePath()); + } functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName, mergedConfig, componentPackageFile); }