From ee3e06d660cf8409607695ee8041dd563ff0cac9 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Fri, 4 May 2018 22:38:59 -0700 Subject: [PATCH] Encode zk-path for function-pkg (#1727) * Encode zk-path for function-pkg * encode only function-name in package-path * fix test --- .../apache/pulsar/functions/worker/Utils.java | 1 + .../worker/rest/api/FunctionsImpl.java | 23 ++++++++----------- .../api/v2/FunctionApiV2ResourceTest.java | 3 ++- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 071e946ed3292..494368f348812 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -39,6 +39,7 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 526175016d04d..2544c7c203578 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; @@ -136,12 +137,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, .setVersion(0); PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(String.format( - "%s/%s/%s/%s", - tenant, - namespace, - functionName, - Utils.getUniquePackageName(fileDetail.getFileName()))); + .setPackagePath(createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); @@ -189,12 +185,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, .setVersion(0); PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(String.format( - "%s/%s/%s/%s", - tenant, - namespace, - functionName, - Utils.getUniquePackageName(fileDetail.getFileName()))); + .setPackagePath(createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); @@ -617,7 +608,7 @@ public Response uploadFunction(final @FormDataParam("data") InputStream uploaded Utils.uploadToBookeeper( worker().getDlogNamespace(), uploadedInputStream, - path); + Codec.encode(path)); } catch (IOException e) { log.error("Error uploading file {}", path, e); return Response.serverError() @@ -637,7 +628,7 @@ public Response downloadFunction(final @QueryParam("path") String path) { @Override public void write(final OutputStream output) throws IOException { Utils.downloadFromBookkeeper(worker().getDlogNamespace(), - output, path); + output, Codec.decode(path)); } }).build(); } @@ -781,4 +772,8 @@ private Response getUnavailableResponse() { .build(); } + public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) { + return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), + Utils.getUniquePackageName(Codec.encode(fileName))); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index cddc1526b574b..7ba1efcec522e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -113,8 +113,9 @@ public String process(String input, Context context) throws Exception { public void setup() { this.mockedManager = mock(FunctionMetaDataManager.class); this.mockedInputStream = mock(InputStream.class); - this.mockedFormData = mock(FormDataContentDisposition.class); this.mockedNamespace = mock(Namespace.class); + this.mockedFormData = mock(FormDataContentDisposition.class); + when(mockedFormData.getFileName()).thenReturn("test"); this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);