Skip to content

Commit

Permalink
Encode zk-path for function-pkg (apache#1727)
Browse files Browse the repository at this point in the history
* Encode zk-path for function-pkg

* encode only function-name in package-path

* fix test
  • Loading branch information
rdhabalia authored and merlimat committed May 5, 2018
1 parent e3f235b commit ee3e06d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
Expand Down Expand Up @@ -136,12 +137,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
.setVersion(0);

PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
.setPackagePath(String.format(
"%s/%s/%s/%s",
tenant,
namespace,
functionName,
Utils.getUniquePackageName(fileDetail.getFileName())));
.setPackagePath(createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);

return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
Expand Down Expand Up @@ -189,12 +185,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
.setVersion(0);

PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
.setPackagePath(String.format(
"%s/%s/%s/%s",
tenant,
namespace,
functionName,
Utils.getUniquePackageName(fileDetail.getFileName())));
.setPackagePath(createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);

return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
Expand Down Expand Up @@ -617,7 +608,7 @@ public Response uploadFunction(final @FormDataParam("data") InputStream uploaded
Utils.uploadToBookeeper(
worker().getDlogNamespace(),
uploadedInputStream,
path);
Codec.encode(path));
} catch (IOException e) {
log.error("Error uploading file {}", path, e);
return Response.serverError()
Expand All @@ -637,7 +628,7 @@ public Response downloadFunction(final @QueryParam("path") String path) {
@Override
public void write(final OutputStream output) throws IOException {
Utils.downloadFromBookkeeper(worker().getDlogNamespace(),
output, path);
output, Codec.decode(path));
}
}).build();
}
Expand Down Expand Up @@ -781,4 +772,8 @@ private Response getUnavailableResponse() {
.build();
}

public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName),
Utils.getUniquePackageName(Codec.encode(fileName)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ public String process(String input, Context context) throws Exception {
public void setup() {
this.mockedManager = mock(FunctionMetaDataManager.class);
this.mockedInputStream = mock(InputStream.class);
this.mockedFormData = mock(FormDataContentDisposition.class);
this.mockedNamespace = mock(Namespace.class);
this.mockedFormData = mock(FormDataContentDisposition.class);
when(mockedFormData.getFileName()).thenReturn("test");

this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
Expand Down

0 comments on commit ee3e06d

Please sign in to comment.