Skip to content

Commit

Permalink
Make LocalRunner work properly with .nar files that aren't on the cla…
Browse files Browse the repository at this point in the history
…sspath (apache#9673)

- also support the previous choice of providing the implementation classes
  on the Thread context classloader classpath

- Pass nar file locations as system properties to tests:
 - Pass pulsar-io cassandra and twitter nar file locations as system properties
   to tests
 - Pass pulsar-io-data-generator.nar and pulsar-functions-api-examples.jar
   file locations as system properties to tests

- Improve LocalRunner cleanup/shutdown which wasn't handled before.

- Fix invalid test in PackagesApiTest

- Make PulsarFunction tests and Sink/Source tests work with .nar files which
  aren't on the classpath.

- Extract FileServer in tests to simplify the test code

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
lhotari and merlimat authored Mar 8, 2021
1 parent 8163435 commit 602137f
Show file tree
Hide file tree
Showing 19 changed files with 1,295 additions and 1,058 deletions.
91 changes: 62 additions & 29 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<artifactId>pulsar-transaction-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down Expand Up @@ -145,7 +145,7 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>org.xerial.snappy</groupId>
Expand All @@ -158,7 +158,7 @@
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependency>

<!-- functions related dependencies (begin) -->

Expand Down Expand Up @@ -315,20 +315,23 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-data-generator</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-data-generator</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -407,49 +410,79 @@
</plugin>

<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
</configuration>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<id>copy-pulsar-io-connectors</id>
<phase>generate-test-resources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-data-generator</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-data-generator.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-data-generator</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-batch-data-generator.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-io-batch-data-generator.nar.path>${project.build.directory}/pulsar-io-batch-data-generator.nar</pulsar-io-batch-data-generator.nar.path>
</systemPropertyVariables>
</configuration>
</plugin>

<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<configuration>
<target>
<echo>copy test examples package</echo>
<mkdir dir="${basedir}/src/test/resources"/>
<copy file="${basedir}/../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar"
tofile="${basedir}/src/test/resources/pulsar-functions-api-examples.jar"/>
<copy file="${basedir}/../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar"
tofile="${basedir}/src/test/resources/pulsar-io-data-generator.nar"/>
<copy file="${basedir}/../pulsar-io/batch-data-generator/target/pulsar-io-batch-data-generator-${project.version}.nar"
tofile="${basedir}/src/test/resources/pulsar-io-batch-data-generator.nar"/>
</target>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
<resources>
<resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void initNamespace() throws Exception {
@Override
@BeforeMethod
public void setup() throws Exception {
resetConfig();
conf.setClusterName(testLocalCluster);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void testPackagesOperations() throws Exception {
admin.packages().upload(originalMetadata, packageName, file.getPath());

// testing download api
String directory = file.getParent();
String downloadPath = directory + "package-api-test-download.package";
String downloadPath = new File(file.getParentFile(), "package-api-test-download.package").getPath();
admin.packages().download(packageName, downloadPath);
File downloadFile = new File(downloadPath);
assertTrue(downloadFile.exists());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import com.google.api.Http;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import lombok.extern.slf4j.Slf4j;

/**
* Simple http server for serving files in Pulsar Function test cases
*/
@Slf4j
public class FileServer implements AutoCloseable {
private final HttpServer httpServer;

public FileServer() throws IOException {
httpServer = HttpServer.create(new InetSocketAddress(0), 0);
// creates a default executor
httpServer.setExecutor(null);
}

public void serveFile(String path, File file) {
httpServer.createContext(path, he -> {
try {
Headers headers = he.getResponseHeaders();
headers.add("Content-Type", "application/octet-stream");

he.sendResponseHeaders(200, file.length());
try (OutputStream outputStream = he.getResponseBody()) {
Files.copy(file.toPath(), outputStream);
}
} catch (Exception e) {
log.error("Error serving file {} for path {}", file, path, e);
}
});
}

public void start() {
httpServer.start();
}

public void stop() {
httpServer.stop(0);
}

public String getUrl(String path) {
return "http://127.0.0.1:" + httpServer.getAddress().getPort() + path;
}

@Override
public void close() throws Exception {
stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarApiExamplesJar;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testAuthorizationWithAnonymousUser() throws Exception {
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build())
) {

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();

FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
sourceTopic, sinkTopic, subscriptionName);
Expand Down Expand Up @@ -563,7 +564,7 @@ public void testAuthorization() throws Exception {
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken2).build())
) {

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
String jarFilePathUrl = getPulsarApiExamplesJar().toURI().toString();

FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
sourceTopic, sinkTopic, subscriptionName);
Expand Down
Loading

0 comments on commit 602137f

Please sign in to comment.