Skip to content

Commit

Permalink
fix submit function via url (apache#3934)
Browse files Browse the repository at this point in the history
* fix submit function via url

* cleaning up

* add test

* make method private

* add additional tests

* cleaning up

* improving tests
  • Loading branch information
jerrypeng authored and srkukarni committed Mar 29, 2019
1 parent 4150f47 commit 5ea4231
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import lombok.ToString;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -61,11 +63,16 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";

private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private Thread fileServerThread;
private static final int fileServerPort = PortManager.nextFreePort();
private HttpServer fileServer;

@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
Expand Down Expand Up @@ -213,12 +223,71 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {

System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");

Thread.sleep(100);
// setting up simple web sever to test submitting function via URL
fileServerThread = new Thread(() -> {
try {
fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
try {

Headers headers = he.getResponseHeaders();
headers.add("Content-Type", "application/octet-stream");

File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
byte[] bytes = new byte [(int)file.length()];

FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
bufferedInputStream.read(bytes, 0, bytes.length);

he.sendResponseHeaders(200, file.length());
OutputStream outputStream = he.getResponseBody();
outputStream.write(bytes, 0, bytes.length);
outputStream.close();

} catch (Exception e) {
log.error("Error when downloading: {}", e, e);
}
});
fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
try {

Headers headers = he.getResponseHeaders();
headers.add("Content-Type", "application/octet-stream");

File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
byte[] bytes = new byte [(int)file.length()];

FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
bufferedInputStream.read(bytes, 0, bytes.length);

he.sendResponseHeaders(200, file.length());
OutputStream outputStream = he.getResponseBody();
outputStream.write(bytes, 0, bytes.length);
outputStream.close();

} catch (Exception e) {
log.error("Error when downloading: {}", e, e);
}
});
fileServer.setExecutor(null); // creates a default executor
log.info("Starting file server...");
fileServer.start();
} catch (Exception e) {
log.error("Failed to start file server: ", e);
fileServer.stop(0);
}

});
fileServerThread.start();
}

@AfterMethod
void shutdown() throws Exception {
log.info("--- Shutting down ---");
fileServer.stop(0);
fileServerThread.interrupt();
pulsarClient.close();
admin.close();
functionsWorkerService.stop();
Expand Down Expand Up @@ -309,8 +378,7 @@ private static SinkConfig createSinkConfig(String tenant, String namespace, Stri
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testE2EPulsarFunction() throws Exception {
private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
Expand All @@ -328,7 +396,6 @@ public void testE2EPulsarFunction() throws Exception {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
Expand Down Expand Up @@ -386,7 +453,18 @@ public void testE2EPulsarFunction() throws Exception {
}

@Test(timeOut = 20000)
public void testPulsarSinkStats() throws Exception {
public void testE2EPulsarFunctionWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
testE2EPulsarFunction(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testE2EPulsarFunctionWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
testE2EPulsarFunction(jarFilePathUrl);
}

private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
Expand All @@ -401,7 +479,6 @@ public void testPulsarSinkStats() throws Exception {
// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);

Expand All @@ -413,7 +490,7 @@ public void testPulsarSinkStats() throws Exception {
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
}, 50, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

Expand Down Expand Up @@ -586,7 +663,18 @@ public void testPulsarSinkStats() throws Exception {
}

@Test(timeOut = 20000)
public void testPulsarSourceStats() throws Exception {
public void testPulsarSinkStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSinkStats(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
testPulsarSinkStats(jarFilePathUrl);
}

private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
Expand All @@ -595,7 +683,6 @@ public void testPulsarSourceStats() throws Exception {
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);

Expand All @@ -615,7 +702,7 @@ public void testPulsarSourceStats() throws Exception {
} catch (PulsarAdminException e) {
return false;
}
}, 10, 150);
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);

String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
Expand Down Expand Up @@ -687,6 +774,18 @@ public void testPulsarSourceStats() throws Exception {
assertTrue(m.value > 0.0);
}

@Test(timeOut = 20000)
public void testPulsarSourceStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSourceStats(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testPulsarSourceStatsWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
testPulsarSourceStats(jarFilePathUrl);
}

@Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,11 @@ public static File extractFileFromPkg(String destPkgUrl) throws IOException, URI
URL website = new URL(destPkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
fos.getChannel().transferFrom(rbc, 0, 10);
}
if (tempFile.exists()) {
tempFile.delete();
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
}
log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
return tempFile;
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
Expand Down Expand Up @@ -318,39 +317,16 @@ public static NarClassLoader extractNarClassLoader(Path archivePath, String pkgU
throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
}
}

if (!isEmpty(pkgUrl)) {
if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
try {
URL url = new URL(pkgUrl);
File file = new File(url.toURI());
if (!file.exists()) {
throw new IOException(pkgUrl + " does not exists locally");
}
return NarClassLoader.getFromArchive(file, Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
} else if (pkgUrl.startsWith("http")) {
try {
URL website = new URL(pkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
fos.getChannel().transferFrom(rbc, 0, 10);
}
if (tempFile.exists()) {
tempFile.delete();
}
return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]");
try {
return NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
}

if (uploadedInputStreamFileName != null) {
try {
return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
Expand Down

0 comments on commit 5ea4231

Please sign in to comment.