diff --git a/pom.xml b/pom.xml index 39a31de5ed364..b6be8686e1846 100644 --- a/pom.xml +++ b/pom.xml @@ -387,6 +387,12 @@ flexible messaging model and an intuitive client API. commons-configuration 1.6 + + + commons-io + commons-io + 2.5 + net.jpountz.lz4 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index c22b6117b2b54..a00a2dd408594 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -74,12 +74,13 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @PathParam("functionName") String functionName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { return functions.registerFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson); + tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); } - + @PUT @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode") @ApiResponses(value = { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 7c2d907f55d2d..a86036c194c86 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -76,7 +76,7 @@ public interface Functions { FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** - * Create a new function. + * Create a new function. * * @param functionDetails * the function configuration object @@ -85,6 +85,22 @@ public interface Functions { * Unexpected error */ void createFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException; + + /** + *
+     * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * 
+ * + * @param functionDetails + * the function configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws PulsarAdminException + */ + void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException; /** * Update the configuration for a function. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 1e2ae22cd4a14..1a1ab60a8ad20 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -121,6 +121,23 @@ public void createFunction(FunctionDetails functionDetails, String fileName) thr } } + @Override + public void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + + mp.bodyPart(new FormDataBodyPart("functionDetails", + printJson(functionDetails), + MediaType.APPLICATION_JSON_TYPE)); + request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName())) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException { try { diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 72088610ea84b..4860ae7826cfb 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -97,6 +97,12 @@ org.apache.distributedlog distributedlog-core
+ + + commons-io + commons-io + + diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 4edee600402ff..86de82fb45c55 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -35,8 +35,10 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; +import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.isFunctionPackageUrlSupported; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; @@ -103,18 +105,44 @@ public void join() throws InterruptedException { } private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { - Function.Instance instance = functionRuntimeInfo.getFunctionInstance(); - FunctionMetaData functionMetaData = instance.getFunctionMetaData(); + FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); log.info("Starting function {} - {} ...", - functionMetaData.getFunctionDetails().getName(), instance.getInstanceId()); + functionMetaData.getFunctionDetails().getName(), instanceId); + File pkgFile = null; + + String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); + boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); + + if(isPkgUrlProvided && pkgLocation.startsWith(Utils.FILE)) { + pkgFile = new File(pkgLocation); + } else { + downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); + } + + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails()); + // TODO: set correct function id and version when features implemented + instanceConfig.setFunctionId(UUID.randomUUID().toString()); + instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); + instanceConfig.setInstanceId(String.valueOf(instanceId)); + instanceConfig.setMaxBufferedTuples(1024); + instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort()); + RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), + runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs()); + + functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); + runtimeSpawner.start(); + } + + private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException { + File pkgDir = new File( workerConfig.getDownloadDirectory(), - getDownloadPackagePath(functionMetaData, instance.getInstanceId())); + getDownloadPackagePath(functionMetaData, instanceId)); pkgDir.mkdirs(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - - File pkgFile = new File( + pkgFile = new File( pkgDir, new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName()); @@ -133,14 +161,21 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep break; } } - try { - log.info("Function package file {} will be downloaded from {}", - tempPkgFile, functionMetaData.getPackageLocation()); + String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath(); + boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(Utils.HTTP); + log.info("Function package file {} will be downloaded from {}", tempPkgFile, + downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); + + if(downloadFromHttp) { + Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile)); + } else { Utils.downloadFromBookkeeper( dlogNamespace, new FileOutputStream(tempPkgFile), - functionMetaData.getPackageLocation().getPackagePath()); - + pkgLocationPath); + } + + try { // create a hardlink, if there are two concurrent createLink operations, one will fail. // this ensures one instance will successfully download the package. try { @@ -157,20 +192,6 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep } finally { tempPkgFile.delete(); } - - InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails()); - // TODO: set correct function id and version when features implemented - instanceConfig.setFunctionId(UUID.randomUUID().toString()); - instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); - instanceConfig.setInstanceId(String.valueOf(instanceId)); - instanceConfig.setMaxBufferedTuples(1024); - instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort()); - RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), - runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs()); - - functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); - runtimeSpawner.start(); } private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index b64a7fad2e308..586938c3dadfb 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -20,16 +20,20 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.util.UUID; - import lombok.extern.slf4j.Slf4j; - import org.apache.distributedlog.AppendOnlyStreamWriter; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.DistributedLogManager; @@ -39,14 +43,17 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; +import org.apache.pulsar.functions.proto.Function; @Slf4j public final class Utils { + public static String HTTP = "http"; + public static String FILE = "file"; + private Utils(){} public static Object getObject(byte[] byteArr) throws IOException, ClassNotFoundException { @@ -124,6 +131,34 @@ public static void uploadToBookeeper(Namespace dlogNamespace, } } + public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException { + if (destPkgUrl.startsWith(FILE)) { + URL url = new URL(destPkgUrl); + File file = new File(url.toURI()); + if (!file.exists()) { + throw new IOException(destPkgUrl + " does not exists locally"); + } + } else if (destPkgUrl.startsWith("http")) { + URL website = new URL(destPkgUrl); + File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString()); + ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + fos.getChannel().transferFrom(rbc, 0, 10); + } + if (tempFile.exists()) { + tempFile.delete(); + } + } else { + throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); + } + } + + public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException { + URL website = new URL(destPkgUrl); + ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + } + public static void downloadFromBookkeeper(Namespace namespace, OutputStream outputStream, String packagePath) throws IOException { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 6d5cd1ae209eb..cb2da74cd460a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -21,9 +21,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.gson.Gson; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URISyntaxException; +import java.net.URL; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -42,6 +48,8 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -55,6 +63,7 @@ import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.worker.FunctionActioner; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; @@ -101,6 +110,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @PathParam("functionName") String functionName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { if (!isWorkerServiceAvailable()) { @@ -108,11 +118,17 @@ public Response registerFunction(final @PathParam("tenant") String tenant, } FunctionDetails functionDetails; + boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); // validate parameters try { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - uploadedInputStream, fileDetail, functionDetailsJson); - } catch (IllegalArgumentException e) { + if(isPkgUrlProvided) { + functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + functionPkgUrl, functionDetailsJson); + }else { + functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + uploadedInputStream, fileDetail, functionDetailsJson); + } + } catch (Exception e) { log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); return Response.status(Status.BAD_REQUEST) @@ -136,10 +152,10 @@ public Response registerFunction(final @PathParam("tenant") String tenant, .setVersion(0); PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + .setPackagePath(isPkgUrlProvided ? functionPkgUrl : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); - return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); + return isPkgUrlProvided ? updateRequest(functionMetaDataBuilder.build()) : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } @PUT @@ -408,23 +424,22 @@ public Response listFunctions(final @PathParam("tenant") String tenant, return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build(); } - private Response updateRequest(FunctionMetaData functionMetaData, - InputStream uploadedInputStream) { + private Response updateRequest(FunctionMetaData functionMetaData, InputStream uploadedInputStream) { // Upload to bookkeeper try { log.info("Uploading function package to {}", functionMetaData.getPackageLocation()); - Utils.uploadToBookeeper( - worker().getDlogNamespace(), - uploadedInputStream, - functionMetaData.getPackageLocation().getPackagePath()); + Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, + functionMetaData.getPackageLocation().getPackagePath()); } catch (IOException e) { log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e); - return Response.serverError() - .type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(e.getMessage())) + return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) .build(); } + return updateRequest(functionMetaData); + } + + private Response updateRequest(FunctionMetaData functionMetaData) { // Submit to FMT FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); @@ -622,14 +637,26 @@ public Response uploadFunction(final @FormDataParam("data") InputStream uploaded @GET @Path("/download") public Response downloadFunction(final @QueryParam("path") String path) { - return Response.status(Status.OK).entity( - new StreamingOutput() { - @Override - public void write(final OutputStream output) throws IOException { - Utils.downloadFromBookkeeper(worker().getDlogNamespace(), - output, Codec.encode(path)); + return Response.status(Status.OK).entity(new StreamingOutput() { + @Override + public void write(final OutputStream output) throws IOException { + if (path.startsWith(Utils.HTTP)) { + URL url = new URL(path); + IOUtils.copy(url.openStream(), output); + } else if (path.startsWith(Utils.FILE)) { + URL url = new URL(path); + File file; + try { + file = new File(url.toURI()); + IOUtils.copy(new FileInputStream(file), output); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("invalid file url path: " + path); } - }).build(); + } else { + Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, Codec.encode(path)); + } + } + }).build(); } private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException { @@ -683,11 +710,36 @@ private void validateDeregisterRequestParams(String tenant, } } + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, + String functionPkgUrl, String functionDetailsJson) + throws IllegalArgumentException, IOException, URISyntaxException { + if (!isFunctionPackageUrlSupported(functionPkgUrl)) { + throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); + } + Utils.validateFileUrl(functionPkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory()); + return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson); + } + + public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { + return StringUtils.isBlank(functionPkgUrl) + || !(functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); + } + + private FunctionDetails validateUpdateRequestParams(String tenant, + String namespace, + String functionName, + InputStream uploadedInputStream, + FormDataContentDisposition fileDetail, + String functionDetailsJson) throws IllegalArgumentException { + if (uploadedInputStream == null || fileDetail == null) { + throw new IllegalArgumentException("Function Package is not provided"); + } + return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson); + } + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - InputStream uploadedInputStream, - FormDataContentDisposition fileDetail, String functionDetailsJson) throws IllegalArgumentException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); @@ -698,9 +750,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, if (functionName == null) { throw new IllegalArgumentException("Function Name is not provided"); } - if (uploadedInputStream == null || fileDetail == null) { - throw new IllegalArgumentException("Function Package is not provided"); - } + if (functionDetailsJson == null) { throw new IllegalArgumentException("FunctionDetails is not provided"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 00c2a2836b2c1..778eaaba6fb8d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -48,10 +48,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @PathParam("functionName") String functionName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("functionDetails") String functionDetailsJson) { return functions.registerFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson); + tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java new file mode 100644 index 0000000000000..b2a92f4252269 --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.UUID; + +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Unit test of {@link Utils}. + */ +public class UtilsTest { + + @Test + public void testValidateLocalFileUrl() throws Exception { + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + try { + // eg: fileLocation : /dir/fileName.jar (invalid) + Utils.validateFileUrl(fileLocation, testDir); + Assert.fail("should fail with invalid url: without protocol"); + } catch (IllegalArgumentException ie) { + // Ok.. expected exception + } + String fileLocationWithProtocol = "file://" + fileLocation; + // eg: fileLocation : file:///dir/fileName.jar (valid) + Utils.validateFileUrl(fileLocationWithProtocol, testDir); + // eg: fileLocation : file:/dir/fileName.jar (valid) + fileLocationWithProtocol = "file:" + fileLocation; + Utils.validateFileUrl(fileLocationWithProtocol, testDir); + } + + @Test + public void testValidateHttpFileUrl() throws Exception { + + String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar"; + String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + Utils.validateFileUrl(jarHttpUrl, testDir); + + jarHttpUrl = "http://_invalidurl_.com"; + try { + // eg: fileLocation : /dir/fileName.jar (invalid) + Utils.validateFileUrl(jarHttpUrl, testDir); + Assert.fail("should fail with invalid url: without protocol"); + } catch (Exception ie) { + // Ok.. expected exception + } + } + + @Test + public void testDownloadFile() throws Exception { + String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar"; + String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + Utils.downloadFromHttpUrl(jarHttpUrl, new FileOutputStream(pkgFile)); + Assert.assertTrue(pkgFile.exists()); + pkgFile.delete(); + } + +} diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 296273f977260..16161e023d973 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -29,16 +29,20 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; -import com.google.gson.Gson; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; + import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; @@ -48,13 +52,13 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SubscriptionType; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.Utils; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -70,6 +74,9 @@ import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import com.google.common.collect.Lists; +import com.google.gson.Gson; + /** * Unit test of {@link FunctionApiV2Resource}. */ @@ -287,6 +294,7 @@ private void testRegisterFunctionMissingArguments( function, inputStream, details, + null, org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -314,6 +322,7 @@ private Response registerDefaultFunction() throws IOException { function, mockedInputStream, mockedFormData, + null, org.apache.pulsar.functions.utils.Utils.printJson(functionDetails)); } @@ -923,4 +932,36 @@ public void testListFunctionsSuccess() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(new Gson().toJson(functions), response.getEntity()); } + + @Test + public void testDownloadFunctionHttpUrl() throws Exception { + String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar"; + String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionsImpl function = new FunctionsImpl(null); + Response response = function.downloadFunction(jarHttpUrl); + StreamingOutput streamOutput = (StreamingOutput) response.getEntity(); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + if (pkgFile.exists()) { + pkgFile.delete(); + } + } + + @Test + public void testDownloadFunctionFile() throws Exception { + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionsImpl function = new FunctionsImpl(null); + Response response = function.downloadFunction("file://"+fileLocation); + StreamingOutput streamOutput = (StreamingOutput) response.getEntity(); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + if (pkgFile.exists()) { + pkgFile.delete(); + } + } }