Skip to content

Commit

Permalink
[Testing] Improve Functions unit tests by using unique nar extraction…
Browse files Browse the repository at this point in the history
… directory (apache#10032)

- Use unique temporary nar extraction directory for each test
  - This fixes possible issues caused by running tests with testForkCount > 1 setting (current status in master branch)
  - Delete directory consistently in AfterMethod

- Add feature LocalRunner to create a unique temp directory for extracting
  nar files. Delete directory on close

- Fix some ClassLoader resource cleanup issues that came up while working on the changes
  • Loading branch information
lhotari authored Mar 26, 2021
1 parent ccecd03 commit 3181689
Show file tree
Hide file tree
Showing 23 changed files with 591 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,40 @@
*/
package org.apache.pulsar.functions.worker;

import com.google.api.Http;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;

/**
* Simple http server for serving files in Pulsar Function test cases
*/
@Slf4j
public class FileServer implements AutoCloseable {
private static final String HEALTH_PATH = "/health";
private final HttpServer httpServer;

public FileServer() throws IOException {
httpServer = HttpServer.create(new InetSocketAddress(0), 0);
// creates a default executor
httpServer.setExecutor(null);
httpServer.createContext(HEALTH_PATH, he -> {
he.sendResponseHeaders(204, 0);
});
}

public void serveFile(String path, File file) {
assertTrue(file.exists(), file.getAbsolutePath() + " doesn't exist.");
httpServer.createContext(path, he -> {
try {
Headers headers = he.getResponseHeaders();
Expand All @@ -59,6 +69,29 @@ public void serveFile(String path, File file) {

public void start() {
httpServer.start();
waitUntilServerIsAvailable();
}

private void waitUntilServerIsAvailable() {
// wait until server is available.
// There has been a few flakiness issues where the server hasn't been available when
// the system-under-test has started to download files
// this assertion will call the "/health" endpoint and check that 204 status code is returned.
Awaitility.await()
.ignoreExceptions()
.untilAsserted(() -> {
HttpURLConnection urlConnection = (HttpURLConnection) new URL(getUrl(HEALTH_PATH))
.openConnection();
urlConnection.setUseCaches(false);
urlConnection.setConnectTimeout(5000);
urlConnection.setReadTimeout(5000);
try {
urlConnection.connect();
assertEquals(urlConnection.getResponseCode(), 204);
} finally {
urlConnection.disconnect();
}
});
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.jsonwebtoken.SignatureAlgorithm;

import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
Expand All @@ -42,9 +40,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.crypto.SecretKey;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand All @@ -63,7 +59,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand Down Expand Up @@ -112,6 +107,7 @@ public class PulsarFunctionE2ESecurityTest {
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private String adminToken;
private String brokerServiceUrl;
private PulsarFunctionTestTemporaryDirectory tempDirectory;

@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
Expand Down Expand Up @@ -210,12 +206,18 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {

@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
pulsarClient.close();
superUserAdmin.close();
functionsWorkerService.stop();
pulsar.close();
bkEnsemble.stop();
try {
log.info("--- Shutting down ---");
pulsarClient.close();
superUserAdmin.close();
functionsWorkerService.stop();
pulsar.close();
bkEnsemble.stop();
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
}
}
}

private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
Expand All @@ -224,6 +226,8 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());

workerConfig = new WorkerConfig();
tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -48,7 +47,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -122,6 +120,7 @@ public class PulsarFunctionLocalRunTest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";

private static final String SYSTEM_PROPERTY_NAME_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path";
private PulsarFunctionTestTemporaryDirectory tempDirectory;

public static File getPulsarIODataGeneratorNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NAR_FILE_PATH)
Expand Down Expand Up @@ -177,15 +176,6 @@ void closeClassLoader() throws IOException {

@BeforeMethod
void setup(Method method) throws Exception {

// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));

for (File file : foundFiles) {
file.delete();
}

log.info("--- Setting up method {} ---", method.getName());

// Start local bookkeeper ensemble
Expand Down Expand Up @@ -230,16 +220,8 @@ void setup(Method method) throws Exception {
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
File connectorsDir = new File(workerConfig.getConnectorsDirectory());

if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
}

if (connectorsDir.mkdir()) {
File file = getPulsarIODataGeneratorNar();
Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
} else {
throw new RuntimeException("Failed to create builtin connectors directory");
}
File file = getPulsarIODataGeneratorNar();
Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
}

Optional<WorkerService> functionWorkerService = Optional.empty();
Expand Down Expand Up @@ -292,16 +274,17 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {

@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
fileServer.stop();
pulsarClient.close();
admin.close();
pulsar.close();
bkEnsemble.stop();

File connectorsDir = new File(workerConfig.getConnectorsDirectory());
if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
try {
log.info("--- Shutting down ---");
fileServer.stop();
pulsarClient.close();
admin.close();
pulsar.close();
bkEnsemble.stop();
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
}
}
}

Expand All @@ -311,6 +294,8 @@ private WorkerConfig createWorkerConfig(ServiceConfiguration config) {
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());

WorkerConfig workerConfig = new WorkerConfig();
tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
Expand Down Expand Up @@ -681,7 +666,9 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
.build();

localRunner.start(false);

Expand Down Expand Up @@ -793,7 +780,9 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
.build();

localRunner.start(false);

Expand Down
Loading

0 comments on commit 3181689

Please sign in to comment.