Skip to content

Commit

Permalink
Support function registration with package-url (apache#1902)
Browse files Browse the repository at this point in the history
* Support function registration with package-url

* keep one admin api for createFunction
  • Loading branch information
rdhabalia authored Jun 5, 2018
1 parent 6b84809 commit 8444046
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 64 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>

<dependency>
<groupId>net.jpountz.lz4</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionDetails") String functionDetailsJson) {

return functions.registerFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);
tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson);
}

@PUT
@ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
@ApiResponses(value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface Functions {
FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Create a new function.
* Create a new function.
*
* @param functionDetails
* the function configuration object
Expand All @@ -85,6 +85,22 @@ public interface Functions {
* Unexpected error
*/
void createFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException;

/**
* <pre>
* Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
* </pre>
*
* @param functionDetails
* the function configuration object
* @param pkgUrl
* url from which pkg can be downloaded
* @throws PulsarAdminException
*/
void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException;

/**
* Update the configuration for a function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ public void createFunction(FunctionDetails functionDetails, String fileName) thr
}
}

@Override
public void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();

mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));

mp.bodyPart(new FormDataBodyPart("functionDetails",
printJson(functionDetails),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName()))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException {
try {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.isFunctionPackageUrlSupported;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -103,18 +105,44 @@ public void join() throws InterruptedException {
}

private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
log.info("Starting function {} - {} ...",
functionMetaData.getFunctionDetails().getName(), instance.getInstanceId());
functionMetaData.getFunctionDetails().getName(), instanceId);
File pkgFile = null;

String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);

if(isPkgUrlProvided && pkgLocation.startsWith(Utils.FILE)) {
pkgFile = new File(pkgLocation);
} else {
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
}

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails());
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());

functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
}

private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {

File pkgDir = new File(
workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instance.getInstanceId()));
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();

int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();

File pkgFile = new File(
pkgFile = new File(
pkgDir,
new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName());

Expand All @@ -133,14 +161,21 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
break;
}
}
try {
log.info("Function package file {} will be downloaded from {}",
tempPkgFile, functionMetaData.getPackageLocation());
String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(Utils.HTTP);
log.info("Function package file {} will be downloaded from {}", tempPkgFile,
downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());

if(downloadFromHttp) {
Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile));
} else {
Utils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(tempPkgFile),
functionMetaData.getPackageLocation().getPackagePath());

pkgLocationPath);
}

try {
// create a hardlink, if there are two concurrent createLink operations, one will fail.
// this ensures one instance will successfully download the package.
try {
Expand All @@ -157,20 +192,6 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
} finally {
tempPkgFile.delete();
}

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails());
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());

functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
}

private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.UUID;

import lombok.extern.slf4j.Slf4j;

import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
Expand All @@ -39,14 +43,17 @@
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.pulsar.functions.proto.Function;

@Slf4j
public final class Utils {

public static String HTTP = "http";
public static String FILE = "file";

private Utils(){}

public static Object getObject(byte[] byteArr) throws IOException, ClassNotFoundException {
Expand Down Expand Up @@ -124,6 +131,34 @@ public static void uploadToBookeeper(Namespace dlogNamespace,
}
}

public static void validateFileUrl(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
if (destPkgUrl.startsWith(FILE)) {
URL url = new URL(destPkgUrl);
File file = new File(url.toURI());
if (!file.exists()) {
throw new IOException(destPkgUrl + " does not exists locally");
}
} else if (destPkgUrl.startsWith("http")) {
URL website = new URL(destPkgUrl);
File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
fos.getChannel().transferFrom(rbc, 0, 10);
}
if (tempFile.exists()) {
tempFile.delete();
}
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
}
}

public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
URL website = new URL(destPkgUrl);
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
}

public static void downloadFromBookkeeper(Namespace namespace,
OutputStream outputStream,
String packagePath) throws IOException {
Expand Down
Loading

0 comments on commit 8444046

Please sign in to comment.