Skip to content

Commit

Permalink
Improve and add authorization to function download and upload (apach…
Browse files Browse the repository at this point in the history
…e#4644)

* Improve and add authorization to function download and upload

* cleaning up

* fix bug
  • Loading branch information
jerrypeng authored and merlimat committed Jul 2, 2019
1 parent deb6492 commit 20cf739
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,41 @@ public void startFunction(

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
value = "Uploads Pulsar Function file data (Admin only)",
hidden = true
)
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
functions.uploadFunction(uploadedInputStream, path);
functions.uploadFunction(uploadedInputStream, path, clientAppId());
}

@GET
@ApiOperation(
value = "Downloads Pulsar Function file data",
value = "Downloads Pulsar Function file data (Admin only)",
hidden = true
)
@Path("/download")
public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
return functions.downloadFunction(path);
return functions.downloadFunction(path, clientAppId(), clientAuthData());
}

@GET
@ApiOperation(
value = "Downloads Pulsar Function file data",
hidden = true
)
@Path("/{tenant}/{namespace}/{functionName}/download")
public StreamingOutput downloadFunction(
@ApiParam(value = "The tenant of functions")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of functions")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of functions")
final @PathParam("functionName") String functionName) {

return functions.downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,14 @@ public Response stopFunction(final @PathParam("tenant") String tenant,

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
value = "Uploads Pulsar Function file data (admin only)",
hidden = true
)
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
return functions.uploadFunction(uploadedInputStream, path);
return functions.uploadFunction(uploadedInputStream, path, clientAppId());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,21 @@ FunctionStats getFunctionStats(String tenant, String namespace, String function)
*/
void downloadFunction(String destinationFile, String path) throws PulsarAdminException;

/**
* Download Function Code.
*
* @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @throws PulsarAdminException
*/
void downloadFunction(String destinationFile, String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Deprecated in favor of getting sources and sinks for their own APIs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,24 @@ public void uploadFunction(String sourceFile, String path) throws PulsarAdminExc
}
}

@Override
public void downloadFunction(String destinationPath, String tenant, String namespace, String functionName) throws PulsarAdminException {
downloadFile(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download"));
}

@Override
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
downloadFile(destinationPath, functions.path("download").queryParam("path", path));
}

private void downloadFile(String destinationPath, WebTarget target) throws PulsarAdminException {
HttpResponseStatus status;
try {
File file = new File(destinationPath);
if (!file.exists()) {
file.createNewFile();
}
FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
WebTarget target = functions.path("download").queryParam("path", path);

RequestBuilder builder = get(target.getUri().toASCIIString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ void runCmd() throws Exception {
}

@Parameters(commandDescription = "Download File Data from Pulsar", hidden = true)
class DownloadFunction extends BaseCommand {
class DownloadFunction extends FunctionCommand {
// for backward compatibility purposes
@Parameter(
names = "--destinationFile",
Expand All @@ -932,21 +932,32 @@ class DownloadFunction extends BaseCommand {
@Parameter(
names = "--path",
description = "Path where the contents are to be stored",
listConverter = StringConverter.class, required = true)
listConverter = StringConverter.class, required = false, hidden = true)
protected String path;

private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_destinationFile)) destinationFile = DEPRECATED_destinationFile;
}

@Override
void processArguments() throws Exception {
if (path == null) {
super.processArguments();
}
}

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();
if (StringUtils.isBlank(destinationFile)) {
throw new ParameterException("--destination-file needs to be specified");
}
admin.functions().downloadFunction(destinationFile, path);
if (path != null) {
admin.functions().downloadFunction(destinationFile, path);
} else {
admin.functions().downloadFunction(destinationFile, tenant, namespace, functionName);
}
print("Downloaded successfully");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,13 +751,16 @@ protected List<String> getExecutorCommand() {
return Arrays.asList(
"sh",
"-c",
String.join(" ", getDownloadCommand(userCodePkgUrl, originalCodeFileName))
String.join(" ", getDownloadCommand(instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
originalCodeFileName))
+ " && " + setShardIdEnvironmentVariableCommand()
+ " && " + String.join(" ", processArgs)
);
}

private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) {

// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
Expand All @@ -774,8 +777,12 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())
pulsarAdminUrl,
"functions",
"download",
"--path",
bkPath,
"--tenant",
tenant,
"--namespace",
namespace,
"--name",
name,
"--destination-file",
userCodeFilePath);
}
Expand All @@ -787,8 +794,12 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())
pulsarAdminUrl,
"functions",
"download",
"--path",
bkPath,
"--tenant",
tenant,
"--namespace",
namespace,
"--name",
name,
"--destination-file",
userCodeFilePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -357,7 +356,6 @@ public void deregisterFunction(final String tenant,
log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}

// delete state table
if (null != worker().getStateStoreAdminClient()) {
final String tableNs = getStateNamespace(tenant, namespace);
Expand Down Expand Up @@ -1175,7 +1173,16 @@ public void putFunctionState(final String tenant,
}
}

public void uploadFunction(final InputStream uploadedInputStream, final String path) {
public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole) {

if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}

// validate parameters
try {
if (uploadedInputStream == null || path == null) {
Expand All @@ -1196,27 +1203,100 @@ public void uploadFunction(final InputStream uploadedInputStream, final String p
}
}

public StreamingOutput downloadFunction(final String path) {

final StreamingOutput streamingOutput = new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException {
if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) {
URL url = new URL(path);
IOUtils.copy(url.openStream(), output);
} else if (path.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
URL url = new URL(path);
File file;
try {
file = new File(url.toURI());
IOUtils.copy(new FileInputStream(file), output);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("invalid file url path: " + path);
public StreamingOutput downloadFunction(String tenant, String namespace, String componentName,
String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to download package for {} ", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}

FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}

String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
.getPackageLocation().getPackagePath();

final StreamingOutput streamingOutput = output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = new URL(pkgPath);
IOUtils.copy(url.openStream(), output);
} else if (pkgPath.startsWith(Utils.FILE)) {
URL url = new URL(pkgPath);
File file;
try {
file = new File(url.toURI());
IOUtils.copy(new FileInputStream(file), output);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("invalid file url path: " + pkgPath);
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
};

return streamingOutput;
}

public StreamingOutput downloadFunction(final String path, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {

if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// to maintain backwards compatiblity but still have authorization
String[] tokens = path.split("/");
if (tokens.length == 4) {
String tenant = tokens[0];
String namespace = tokens[1];
String componentName = tokens[2];

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to download package for {} ", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
} catch (PulsarAdminException e) {
log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
} else {
if (!isSuperUser(clientRole)) {
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
}
}

final StreamingOutput streamingOutput = output -> {
if (path.startsWith(Utils.HTTP)) {
URL url = new URL(path);
IOUtils.copy(url.openStream(), output);
} else if (path.startsWith(Utils.FILE)) {
URL url = new URL(path);
File file;
try {
file = new File(url.toURI());
IOUtils.copy(new FileInputStream(file), output);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("invalid file url path: " + path);
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
}
};

return streamingOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu
return Response.ok().build();
}

public Response uploadFunction(InputStream uploadedInputStream, String path) {
delegate.uploadFunction(uploadedInputStream, path);
public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole) {
delegate.uploadFunction(uploadedInputStream, path, clientRole);
return Response.ok().build();
}

public Response downloadFunction(String path, String clientRole) {
return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path, clientRole, null)).build();
}

public List<ConnectorDefinition> getListOfConnectors() {
Expand Down
Loading

0 comments on commit 20cf739

Please sign in to comment.