diff --git a/build/retry_integration.sh b/build/retry_integration.sh deleted file mode 100755 index af22dfd94548c..0000000000000 --- a/build/retry_integration.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -function fail { - echo $1 >&2 - exit 1 -} - -function retry { - local n=1 - local max=3 - local delay=10 - - # cleanup running containers - docker kill $(docker ps -q) - docker rm $(docker ps -a -q) - - while true; do - "$@" && break || { - if [[ $n -lt $max ]]; then - ((n++)) - echo "Command failed. Attempt $n/$max:" - sleep $delay; - else - fail "The command has failed after $n attempts." - fi - } - done -} - -retry "$@" \ No newline at end of file diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 030bb0eea0493..19d20cf1710da 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -37,7 +37,7 @@ mvn_run_integration_test() { RETRY="" # wrap with retry.sh script if next parameter is "--retry" if [[ "$1" == "--retry" ]]; then - RETRY="./build/retry_integration.sh" + RETRY="./build/retry.sh" shift fi # skip wrapping with retry.sh script if next parameter is "--no-retry" @@ -53,12 +53,12 @@ mvn_run_integration_test() { } test_group_shade() { - mvn_run_integration_test "$@" -DShadeTests + mvn_run_integration_test "$@" -DShadeTests -DtestForkCount=1 -DtestReuseFork=false } test_group_backwards_compat() { mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-backwards-compatibility.xml -DintegrationTests - mvn_run_integration_test "$@" -DBackwardsCompatTests + mvn_run_integration_test --retry "$@" -DBackwardsCompatTests -DtestForkCount=1 -DtestReuseFork=false } test_group_cli() { @@ -77,9 +77,9 @@ test_group_messaging() { # run integration messaging tests mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-messaging.xml -DintegrationTests # run integration proxy tests - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests + mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests # run integration proxy with WebSocket tests - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests + mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests } test_group_schema() { @@ -125,7 +125,7 @@ test_group_pulsar_connectors_process() { } test_group_sql() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false } echo "Test Group : $TEST_GROUP" diff --git a/pom.xml b/pom.xml index 0028702d2b802..cbb6bd1f09289 100644 --- a/pom.xml +++ b/pom.xml @@ -1178,15 +1178,11 @@ flexible messaging model and an intuitive client API. kill ${redirectTestOutputToFile} false + + ${testRealAWS} + ${testRetryCount} + - - testRealAWS - ${testRealAWS} - - - testRetryCount - ${testRetryCount} - listener org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener diff --git a/tests/bc_2_0_0/pom.xml b/tests/bc_2_0_0/pom.xml index a850dd99960fa..d039629303529 100644 --- a/tests/bc_2_0_0/pom.xml +++ b/tests/bc_2_0_0/pom.xml @@ -91,16 +91,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/bc_2_0_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/bc_2_0_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index bd32c3d0dc56a..b9d0798f9776b 100644 --- a/tests/bc_2_0_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/bc_2_0_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -33,12 +34,13 @@ import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup(){ pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -72,8 +74,9 @@ public void checkMessages() throws PulsarClientException { } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); } diff --git a/tests/bc_2_0_1/pom.xml b/tests/bc_2_0_1/pom.xml index 0f5ef5d1fd5d6..7f75f698ce0f7 100644 --- a/tests/bc_2_0_1/pom.xml +++ b/tests/bc_2_0_1/pom.xml @@ -91,16 +91,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/bc_2_0_1/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/bc_2_0_1/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index bd32c3d0dc56a..b9d0798f9776b 100644 --- a/tests/bc_2_0_1/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/bc_2_0_1/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -33,12 +34,13 @@ import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup(){ pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -72,8 +74,9 @@ public void checkMessages() throws PulsarClientException { } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); } diff --git a/tests/bc_2_6_0/pom.xml b/tests/bc_2_6_0/pom.xml index db4242afd3e8e..f670eb8c0b763 100644 --- a/tests/bc_2_6_0/pom.xml +++ b/tests/bc_2_6_0/pom.xml @@ -98,16 +98,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/bc_2_6_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/bc_2_6_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index bd32c3d0dc56a..b9d0798f9776b 100644 --- a/tests/bc_2_6_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/bc_2_6_0/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -33,12 +34,13 @@ import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup(){ pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -72,8 +74,9 @@ public void checkMessages() throws PulsarClientException { } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); } diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index c61b9e8b04379..c84be150cbd5c 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -225,16 +225,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java index 9fa73869d0017..9c5ab69976105 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/PackagesOpsWithAuthTest.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.apache.pulsar.tests.integration.containers.ZKContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; @@ -53,7 +54,7 @@ * PackagesOpsWithAuthTest will test all package operations with and without the proper permission. */ @Slf4j -public class PackagesOpsWithAuthTest { +public class PackagesOpsWithAuthTest extends TestRetrySupport { private static final String CLUSTER_PREFIX = "package-auth"; private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key"; @@ -70,8 +71,10 @@ public class PackagesOpsWithAuthTest { private PulsarCluster pulsarCluster; private PulsarContainer cmdContainer; - @BeforeClass + @Override + @BeforeClass(alwaysRun = true) public void setup() throws Exception { + incrementSetupNumber(); // Before starting the cluster, generate the secret key and the token // Use Zk container to have 1 container available before starting the cluster final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6)); @@ -100,8 +103,10 @@ public void setup() throws Exception { pulsarCluster.start(); } + @Override @AfterClass(alwaysRun = true) - public void teardown() { + public void cleanup() { + markCurrentSetupNumberCleaned(); if (cmdContainer != null) { cmdContainer.stop(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java index 5bee2e8055bae..becf139f66f43 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java @@ -70,9 +70,10 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe protected ZKContainer cmdContainer; - @BeforeClass + @BeforeClass(alwaysRun = true) @Override - public void setupCluster() throws Exception { + public final void setupCluster() throws Exception { + incrementSetupNumber(); // Before starting the cluster, generate the secret key and the token // Use Zk container to have 1 container available before starting the cluster this.cmdContainer = new ZKContainer<>("cli-setup"); @@ -130,7 +131,7 @@ public void setupCluster() throws Exception { @AfterClass(alwaysRun = true) @Override - public void tearDownCluster() { + public final void tearDownCluster() throws Exception { super.tearDownCluster(); cmdContainer.close(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java index f794aea806b13..e4eb9cd0cf216 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_2.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; import org.testng.annotations.Test; @@ -27,9 +28,9 @@ public class ClientTest2_2 extends PulsarStandaloneTestSuite2_2 { private final ClientTestBase clientTestBase = new ClientTestBase(); @Test(dataProvider = "StandaloneServiceUrlAndHttpUrl") - public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception { + public void testResetCursorCompatibility(Supplier serviceUrl, Supplier httpServiceUrl) throws Exception { String topicName = generateTopicName("test-reset-cursor-compatibility", true); - clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName); + clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_3.java index 1bc30fd4466c1..d4703e0b07e5c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_3.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_3.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; import org.testng.annotations.Test; @@ -27,9 +28,9 @@ public class ClientTest2_3 extends PulsarStandaloneTestSuite2_3 { private final ClientTestBase clientTestBase = new ClientTestBase(); @Test(dataProvider = "StandaloneServiceUrlAndHttpUrl") - public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception { + public void testResetCursorCompatibility(Supplier serviceUrl, Supplier httpServiceUrl) throws Exception { String topicName = generateTopicName("test-reset-cursor-compatibility", true); - clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName); + clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_4.java index b1d3021a53489..10bb57e98885d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_4.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_4.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; import org.testng.annotations.Test; @@ -27,9 +28,9 @@ public class ClientTest2_4 extends PulsarStandaloneTestSuite2_4 { private final ClientTestBase clientTestBase = new ClientTestBase(); @Test(dataProvider = "StandaloneServiceUrlAndHttpUrl") - public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception { + public void testResetCursorCompatibility(Supplier serviceUrl, Supplier httpServiceUrl) throws Exception { String topicName = generateTopicName("test-reset-cursor-compatibility", true); - clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName); + clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_5.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_5.java index 48a089bc339eb..590f5acf3da04 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_5.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest2_5.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; import org.testng.annotations.Test; @@ -27,9 +28,9 @@ public class ClientTest2_5 extends PulsarStandaloneTestSuite2_5 { private final ClientTestBase clientTestBase = new ClientTestBase(); @Test(dataProvider = "StandaloneServiceUrlAndHttpUrl") - public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception { + public void testResetCursorCompatibility(Supplier serviceUrl, Supplier httpServiceUrl) throws Exception { String topicName = generateTopicName("test-reset-cursor-compatibility", true); - clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName); + clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java index 0b2426646365b..c7cedfc9f226c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_2.java @@ -19,20 +19,10 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; import org.apache.pulsar.tests.integration.containers.PulsarContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -public class PulsarStandaloneTestSuite2_2 extends PulsarStandaloneTestBase { - - @BeforeClass - public void setUpCluster() throws Exception { - super.startCluster(PulsarContainer.PULSAR_2_2_IMAGE_NAME); - } - - @AfterClass(alwaysRun = true) - public void tearDownCluster() throws Exception { - super.stopCluster(); +public abstract class PulsarStandaloneTestSuite2_2 extends PulsarStandaloneTestSuite { + public PulsarStandaloneTestSuite2_2() { + super(PulsarContainer.PULSAR_2_2_IMAGE_NAME); } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_3.java index ab75c795daa54..83dbb1b1dd2b8 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_3.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_3.java @@ -19,20 +19,10 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; import org.apache.pulsar.tests.integration.containers.PulsarContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -public class PulsarStandaloneTestSuite2_3 extends PulsarStandaloneTestBase { - - @BeforeClass - public void setUpCluster() throws Exception { - super.startCluster(PulsarContainer.PULSAR_2_3_IMAGE_NAME); - } - - @AfterClass(alwaysRun = true) - public void tearDownCluster() throws Exception { - super.stopCluster(); +public abstract class PulsarStandaloneTestSuite2_3 extends PulsarStandaloneTestSuite { + public PulsarStandaloneTestSuite2_3() { + super(PulsarContainer.PULSAR_2_3_IMAGE_NAME); } - -} +} \ No newline at end of file diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_4.java index 64aaf30755c6e..21fa83df2c89e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_4.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_4.java @@ -19,20 +19,10 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; import org.apache.pulsar.tests.integration.containers.PulsarContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -public class PulsarStandaloneTestSuite2_4 extends PulsarStandaloneTestBase { - - @BeforeClass - public void setUpCluster() throws Exception { - super.startCluster(PulsarContainer.PULSAR_2_4_IMAGE_NAME); - } - - @AfterClass(alwaysRun = true) - public void tearDownCluster() throws Exception { - super.stopCluster(); +public abstract class PulsarStandaloneTestSuite2_4 extends PulsarStandaloneTestSuite { + public PulsarStandaloneTestSuite2_4() { + super(PulsarContainer.PULSAR_2_4_IMAGE_NAME); } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java index 8312cfd6d9dc6..1a9450fea7470 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java @@ -19,20 +19,10 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; import org.apache.pulsar.tests.integration.containers.PulsarContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -public class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestBase { - - @BeforeClass - public void setUpCluster() throws Exception { - super.startCluster(PulsarContainer.PULSAR_2_5_IMAGE_NAME); - } - - @AfterClass(alwaysRun = true) - public void tearDownCluster() throws Exception { - super.stopCluster(); +public abstract class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestSuite { + public PulsarStandaloneTestSuite2_5() { + super(PulsarContainer.PULSAR_2_5_IMAGE_NAME); } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java index fd139551be727..9312fa18f9258 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java @@ -19,17 +19,18 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.testng.annotations.Test; public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); + public void testBatchMessagePublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchMessagePublishAndConsume(serviceUrl.get(), isPersistent); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java index 248e24a500b3b..e85798429da20 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java @@ -19,17 +19,18 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.testng.annotations.Test; public class SmokeTest2_3 extends PulsarStandaloneTestSuite2_3 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); + public void testBatchMessagePublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchMessagePublishAndConsume(serviceUrl.get(), isPersistent); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java index eb77eaaa6003d..7cd6a1ab52a69 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java @@ -19,22 +19,23 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.testng.annotations.Test; public class SmokeTest2_4 extends PulsarStandaloneTestSuite2_4 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); + public void testBatchMessagePublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchMessagePublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchIndexAckDisabled(serviceUrl); + public void testBatchIndexAckDisabled(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl.get()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java index 2bcf584275b0a..24914ef209002 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java @@ -19,22 +19,23 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.function.Supplier; import org.testng.annotations.Test; public class SmokeTest2_5 extends PulsarStandaloneTestSuite2_5 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); + public void testBatchMessagePublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchMessagePublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchIndexAckDisabled(serviceUrl); + public void testBatchIndexAckDisabled(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl.get()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java index 0053fb55d760e..0b6431d66e9d2 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; @@ -35,20 +36,22 @@ /** * Test for admin service url is multi host. */ -public class AdminMultiHostTest { +public class AdminMultiHostTest extends TestRetrySupport { private final String clusterName = "MultiHostTest-" + UUID.randomUUID(); private final PulsarClusterSpec spec = PulsarClusterSpec.builder().clusterName(clusterName).numBrokers(3).build(); private PulsarCluster pulsarCluster = null; - @BeforeMethod - public void setupCluster() throws Exception { + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + incrementSetupNumber(); pulsarCluster = PulsarCluster.forSpec(spec); pulsarCluster.start(); } @AfterMethod(alwaysRun = true) - public void tearDownCluster() { + public void cleanup() { + markCurrentSetupNumberCleaned(); if (pulsarCluster != null) { pulsarCluster.stop(); pulsarCluster = null; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java index 6212a0729b341..badc4482484dd 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.ChaosContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; @@ -58,7 +59,7 @@ import org.testng.annotations.Test; @Slf4j -public class ClusterMetadataTearDownTest { +public class ClusterMetadataTearDownTest extends TestRetrySupport { private final PulsarClusterSpec spec = PulsarClusterSpec.builder() .clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8)) @@ -67,7 +68,7 @@ public class ClusterMetadataTearDownTest { .enablePrestoWorker(false) .build(); - private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec); + private PulsarCluster pulsarCluster; private ZooKeeper localZk; private ZooKeeper configStoreZk; @@ -79,8 +80,11 @@ public class ClusterMetadataTearDownTest { private PulsarClient client; private PulsarAdmin admin; - @BeforeClass - public void setupCluster() throws Exception { + @Override + @BeforeClass(alwaysRun = true) + public final void setup() throws Exception { + incrementSetupNumber(); + pulsarCluster = PulsarCluster.forSpec(spec); pulsarCluster.start(); metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers"; @@ -96,8 +100,10 @@ public void setupCluster() throws Exception { admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); } + @Override @AfterClass(alwaysRun = true) - public void tearDownCluster() { + public final void cleanup() { + markCurrentSetupNumberCleaned(); try { ledgerManager.close(); } catch (IOException e) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthCheckTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthCheckTest.java index e8677112b68a6..4ee93b8e9e76e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthCheckTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthCheckTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.BKContainer; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.docker.ContainerExecException; @@ -44,7 +45,7 @@ /** * Test the healthcheck command. */ -public class HealthCheckTest { +public class HealthCheckTest extends TestRetrySupport { private final static Logger log = LoggerFactory.getLogger(HealthCheckTest.class); @@ -56,14 +57,16 @@ public class HealthCheckTest { private PulsarCluster pulsarCluster = null; - @BeforeMethod - public void setupCluster() throws Exception { + @BeforeMethod(alwaysRun = true) + public final void setup() throws Exception { + incrementSetupNumber(); pulsarCluster = PulsarCluster.forSpec(spec); pulsarCluster.start(); } @AfterMethod(alwaysRun = true) - public void tearDownCluster() { + public final void cleanup() { + markCurrentSetupNumberCleaned(); if (pulsarCluster != null) { pulsarCluster.stop(); pulsarCluster = null; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java index 4805efad7c946..3d8c6f13f6171 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.cli; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; @@ -35,13 +36,14 @@ import java.util.HashMap; import java.util.Map; -public class PackagesCliTest { +public class PackagesCliTest extends TestRetrySupport { private final static String clusterNamePrefix = "packages-service"; private PulsarCluster pulsarCluster; - @BeforeClass - public void setup() throws Exception { + @BeforeClass(alwaysRun = true) + public final void setup() throws Exception { + incrementSetupNumber(); PulsarClusterSpec spec = PulsarClusterSpec.builder() .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6))) .brokerEnvs(getPackagesManagementServiceEnvs()) @@ -51,7 +53,8 @@ public void setup() throws Exception { } @AfterClass(alwaysRun = true) - public void teardown() { + public final void cleanup() { + markCurrentSetupNumberCleaned(); if (pulsarCluster != null) { pulsarCluster.stop(); pulsarCluster = null; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PulsarVersionTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PulsarVersionTest.java index 756d5d466ea6a..0ea5071dfa68a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PulsarVersionTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PulsarVersionTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.cli; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; @@ -32,13 +33,15 @@ /** * Pulsar version test class. */ -public class PulsarVersionTest { +public class PulsarVersionTest extends TestRetrySupport { private final static String clusterNamePrefix = "pulsar-version"; private PulsarCluster pulsarCluster; - @BeforeClass - public void setup() throws Exception { + @Override + @BeforeClass(alwaysRun = true) + public final void setup() throws Exception { + incrementSetupNumber(); PulsarClusterSpec spec = PulsarClusterSpec.builder() .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6))) .build(); @@ -46,8 +49,10 @@ public void setup() throws Exception { pulsarCluster.start(); } + @Override @AfterClass(alwaysRun = true) - public void teardown() { + public final void cleanup() { + markCurrentSetupNumberCleaned(); if (pulsarCluster != null) { pulsarCluster.stop(); pulsarCluster = null; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java index 52846da18ca17..8f889f3d8432c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.extern.slf4j.Slf4j; @@ -46,7 +47,7 @@ public class TestCompaction extends PulsarTestSuite { @Test(dataProvider = "ServiceUrls", timeOut=300_000) - public void testPublishCompactAndConsumeCLI(String serviceUrl) throws Exception { + public void testPublishCompactAndConsumeCLI(Supplier serviceUrl) throws Exception { final String tenant = "compaction-test-cli-" + randomName(4); final String namespace = tenant + "/ns1"; @@ -56,7 +57,7 @@ public void testPublishCompactAndConsumeCLI(String serviceUrl) throws Exception this.createNamespace(namespace); - try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) { + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); try(Producer producer = client.newProducer(Schema.STRING) @@ -100,7 +101,7 @@ public void testPublishCompactAndConsumeCLI(String serviceUrl) throws Exception } @Test(dataProvider = "ServiceUrls", timeOut=300_000) - public void testPublishCompactAndConsumeRest(String serviceUrl) throws Exception { + public void testPublishCompactAndConsumeRest(Supplier serviceUrl) throws Exception { final String tenant = "compaction-test-rest-" + randomName(4); final String namespace = tenant + "/ns1"; @@ -113,7 +114,7 @@ public void testPublishCompactAndConsumeRest(String serviceUrl) throws Exception pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace); - try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) { + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); try(Producer producer = client.newProducer(Schema.STRING).topic(topic).create()) { @@ -153,7 +154,7 @@ public void testPublishCompactAndConsumeRest(String serviceUrl) throws Exception } @Test(dataProvider = "ServiceUrls", timeOut=300_000) - public void testPublishCompactAndConsumePartitionedTopics(String serviceUrl) throws Exception { + public void testPublishCompactAndConsumePartitionedTopics(Supplier serviceUrl) throws Exception { final String tenant = "compaction-test-partitioned-topic-" + randomName(4); final String namespace = tenant + "/ns1"; @@ -171,7 +172,7 @@ public void testPublishCompactAndConsumePartitionedTopics(String serviceUrl) thr this.createPartitionedTopic(topic, 2); - try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) { + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { // force creating individual partitions client.newConsumer().topic(topic + "-partition-0").subscriptionName(subscriptionName).subscribe().close(); client.newConsumer().topic(topic + "-partition-1").subscriptionName(subscriptionName).subscribe().close(); @@ -302,7 +303,7 @@ private static void waitAndVerifyCompacted(PulsarClient client, String topic, } @Test(dataProvider = "ServiceUrls", timeOut=300_000) - public void testPublishWithAutoCompaction(String serviceUrl) throws Exception { + public void testPublishWithAutoCompaction(Supplier serviceUrl) throws Exception { final String tenant = "compaction-test-auto-" + randomName(4); final String namespace = tenant + "/ns1"; @@ -315,7 +316,7 @@ public void testPublishWithAutoCompaction(String serviceUrl) throws Exception { pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-compaction-threshold", "--threshold", "1", namespace); - try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) { + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { client.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub1").subscribe().close(); try(Producer producer = client.newProducer(Schema.STRING).topic(topic).create()) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 84fede2c7bb59..c273ebc7573cf 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -175,10 +175,6 @@ public void testDebeziumMongoDbSource() throws Exception{ } private void testSink(SinkTester tester, boolean builtin) throws Exception { - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } tester.startServiceContainer(pulsarCluster); try { runSinkTester(tester, builtin); @@ -192,10 +188,6 @@ private void testSink(SinkTester sourceTester) throws Exception { - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster); try { runSinkTester(sinkTester, builtinSink); @@ -881,10 +873,6 @@ public void testFunctionLocalRun(Runtime runtime) throws Exception { return; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } String inputTopicName = "persistent://public/default/test-function-local-run-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-function-local-run-" + runtime + "-output-" + randomName(8); @@ -1011,10 +999,6 @@ public void testWindowFunction(String type, String[] expectedResults) throws Exc String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8); String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8); - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { admin.topics().createNonPartitionedTopic(inputTopicName); admin.topics().createNonPartitionedTopic(outputTopicName); @@ -1173,10 +1157,6 @@ private void testFunctionNegAck(Runtime runtime) throws Exception { return; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } Schema schema; if (Runtime.JAVA == runtime) { @@ -1370,10 +1350,6 @@ private void testPublishFunction(Runtime runtime) throws Exception { schema = Schema.BYTES; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8); @@ -1497,10 +1473,6 @@ private void testCustomSerdeFunction() throws Exception { return; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } String inputTopicName = "persistent://public/default/test-serde-java-input-" + randomName(8); String outputTopicName = "test-publish-serde-output-" + randomName(8); @@ -1573,10 +1545,6 @@ private void testExclamationFunction(Runtime runtime, return; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } Schema schema; if (Runtime.JAVA == runtime) { @@ -2165,10 +2133,6 @@ public void testAutoSchemaFunction() throws Exception { String functionName = "test-autoschema-fn-" + randomName(8); final int numMessages = 10; - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } // submit the exclamation function submitFunction( @@ -2236,12 +2200,6 @@ public void testAvroSchemaFunction() throws Exception { final String functionName = "test-avroschema-fn-" + randomName(8); final int numMessages = 10; - if (pulsarCluster == null) { - log.info("pulsarClient is null"); - this.setupCluster(); - this.setupFunctionWorkers(); - } - @Cleanup PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); log.info("pulsar client init - input: {}, output: {}", inputTopic, outputTopic); @@ -2360,10 +2318,6 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit // This is the binlog count that contained in mysql container. final int numMessages = 47; - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } @Cleanup PulsarClient client = PulsarClient.builder() @@ -2457,10 +2411,6 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js // This is the binlog count that contained in postgresql container. final int numMessages = 26; - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } @Cleanup PulsarClient client = PulsarClient.builder() @@ -2545,10 +2495,6 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean json // This is the binlog count that contained in mongodb container. final int numMessages = 17; - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } @Cleanup PulsarClient client = PulsarClient.builder() @@ -2659,10 +2605,6 @@ private void testLoggingFunction(Runtime runtime) throws Exception { return; } - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } Schema schema; if (Runtime.JAVA == runtime) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index fa324e7ca94e8..01e8fce8ca102 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -50,8 +50,7 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType) { this.functionRuntimeType = functionRuntimeType; } - @BeforeClass - public void setupFunctionWorkers() { + private void setupFunctionWorkers() { final int numFunctionWorkers = 2; log.info("Setting up {} function workers : function runtime type = {}", numFunctionWorkers, functionRuntimeType); @@ -59,13 +58,24 @@ public void setupFunctionWorkers() { log.info("{} function workers has started", numFunctionWorkers); } - @AfterClass(alwaysRun = true) - public void teardownFunctionWorkers() { + private void teardownFunctionWorkers() { log.info("Tearing down function workers ..."); pulsarCluster.stopWorkers(); log.info("All functions workers are stopped."); } + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + setupFunctionWorkers(); + } + + @Override + public void tearDownCluster() throws Exception { + teardownFunctionWorkers(); + super.tearDownCluster(); + } + // // Common Variables used by functions test // diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java index 35980dc43ff90..c5fb6ecffebff 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java @@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java index 9eb29f21302e2..b8b962dad9fdb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java @@ -97,10 +97,6 @@ public AvroKafkaSourceTest() { @Test(groups = "source") public void test() throws Exception { - if (pulsarCluster == null) { - super.setupCluster(); - super.setupFunctionWorkers(); - } startKafkaContainers(pulsarCluster); try { testSource(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java index 70452d4211495..41c15b6887251 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java @@ -22,26 +22,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.functions.FunctionState; -import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; -import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; import org.testng.annotations.Test; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; import static org.testng.Assert.assertEquals; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java index 8b06d1de8da23..7a204279eedf4 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.messaging; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; @@ -39,7 +40,7 @@ public class DelayMessagingTest extends PulsarTestSuite { @Test(dataProvider = "ServiceUrls") - public void delayMsgBlockTest(String serviceUrl) throws Exception { + public void delayMsgBlockTest(Supplier serviceUrl) throws Exception { String nsName = generateNamespaceName(); pulsarCluster.createNamespace(nsName); @@ -50,7 +51,7 @@ public void delayMsgBlockTest(String serviceUrl) throws Exception { String deadLetterTopic = topic + "-DLT"; @Cleanup - PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl.get()).build(); @Cleanup Producer producer = pulsarClient.newProducer() diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java index 7ddd082891040..3ff23c506d060 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java @@ -44,7 +44,7 @@ public abstract class MessagingBase extends PulsarTestSuite { protected String methodName; - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void beforeMethod(Method m) throws Exception { methodName = m.getName(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java index 84f6bf97efcd8..4199febc518a5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; +import java.util.function.Supplier; import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.pulsar.client.api.Consumer; @@ -33,10 +34,10 @@ public class NonDurableConsumerMessagingTest extends MessagingBase { @Test(dataProvider = "ServiceUrls") - public void testNonDurableConsumer(String serviceUrls) throws Exception { + public void testNonDurableConsumer(Supplier serviceUrl) throws Exception { final String topicName = getNonPartitionedTopic("test-non-durable-consumer", false); @Cleanup - final PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrls).build(); + final PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build(); int numMessages = 20; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java index 416896f5160f8..51fc85ae1c799 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.messaging; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.testng.annotations.Test; @@ -25,42 +26,42 @@ public class NonPersistentTopicMessagingTest extends TopicMessagingBase { @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl, false); + public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithExclusive(serviceUrl, false); + public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl, false); + public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithFailover(serviceUrl, false); + public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithShared(serviceUrl, false); + public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithShared(serviceUrl, false); + public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false); + public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false); + public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java index 4bfcde1c18de8..ba7a649326cd3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.messaging; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.testng.annotations.Test; @@ -25,43 +26,43 @@ public class PersistentTopicMessagingTest extends TopicMessagingBase { @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl, true); + public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithExclusive(serviceUrl, true); + public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl, true); + public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithFailover(serviceUrl, true); + public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithShared(serviceUrl, true); + public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithShared(serviceUrl, true); + public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception { - nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true); + public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception { + nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true); } @Test(dataProvider = "ServiceUrls") - public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception { - partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true); + public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception { + partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index ca461566a7349..ad7a8fdf1d048 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.tests.integration.offload; +import java.util.List; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -32,9 +34,6 @@ import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite; import org.testng.Assert; -import java.util.List; -import java.util.concurrent.TimeUnit; - @Slf4j public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { private static final int ENTRY_SIZE = 1024; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java index c0ddfbc8be29b..808aae62e7419 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.offload; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.testng.annotations.Test; @@ -28,18 +29,18 @@ public class TestFileSystemOffload extends TestBaseOffload { @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaCLI(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaThreshold(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java index df33e4c3263ef..f34ec743b7aa0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.S3Container; import org.testng.annotations.AfterClass; @@ -54,18 +55,18 @@ public void teardownS3() { } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaCLI(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaThreshold(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java index bfb87865941c2..9c53d801ea1eb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.offload; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.S3Container; import org.testng.annotations.AfterClass; @@ -53,18 +54,18 @@ public void teardownS3() { } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaCLI(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeViaThreshold(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get()); } @Test(dataProvider = "ServiceAndAdminUrls") - public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception { - super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl); + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 6418cfa9eaac2..3d28a7d4b2c61 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -52,18 +52,28 @@ public class TestBasicPresto extends TestPulsarSQLBase { private static final int NUM_OF_STOCKS = 10; - @BeforeClass - public void setupPresto() throws Exception { + private void setupPresto() throws Exception { log.info("[TestBasicPresto] setupPresto..."); pulsarCluster.startPrestoWorker(); } - @AfterClass(alwaysRun = true) - public void teardownPresto() { + private void teardownPresto() { log.info("[TestBasicPresto] tearing down..."); pulsarCluster.stopPrestoWorker(); } + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + setupPresto(); + } + + @Override + public void tearDownCluster() throws Exception { + teardownPresto(); + super.tearDownCluster(); + } + @DataProvider(name = "schemaProvider") public Object[][] schemaProvider() { return new Object[][] { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index e7c3cd4efb443..b33e5412ed064 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -55,8 +55,19 @@ public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase { private S3Container s3Container; - @BeforeClass - public void setupExtraContainers() throws Exception { + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + setupExtraContainers(); + } + + @Override + public void tearDownCluster() throws Exception { + teardownPresto(); + super.tearDownCluster(); + } + + private void setupExtraContainers() throws Exception { log.info("[TestPrestoQueryTieredStorage] setupExtraContainers..."); pulsarCluster.runAdminCommandOnAnyBroker( "tenants", "create", "--allowed-clusters", pulsarCluster.getClusterName(), @@ -94,7 +105,6 @@ private String getOffloadProperties(String bucket, String region, String endpoin return sb.toString(); } - @AfterClass(alwaysRun = true) public void teardownPresto() { log.info("[TestPrestoQueryTieredStorage] tearing down..."); if (null != s3Container) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/python/PythonSchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/python/PythonSchemaTest.java index 24b1f821fa16a..53f000ea804e8 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/python/PythonSchemaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/python/PythonSchemaTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; +import java.util.function.Supplier; import lombok.Cleanup; import lombok.Data; @@ -53,7 +54,7 @@ static class Example2 { * Publish from Java and consume from Python */ @Test(dataProvider = "ServiceUrls") - public void testJavaPublishPythonConsume(String serviceUrl) throws Exception { + public void testJavaPublishPythonConsume(Supplier serviceUrl) throws Exception { String nsName = generateNamespaceName(); pulsarCluster.createNamespace(nsName); @@ -61,7 +62,7 @@ public void testJavaPublishPythonConsume(String serviceUrl) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build(); // Create subscription to retain data @@ -92,7 +93,7 @@ public void testJavaPublishPythonConsume(String serviceUrl) throws Exception { * Publish from Java and consume from Python */ @Test(dataProvider = "ServiceUrls") - public void testPythonPublishJavaConsume(String serviceUrl) throws Exception { + public void testPythonPublishJavaConsume(Supplier serviceUrl) throws Exception { String nsName = generateNamespaceName(); pulsarCluster.createNamespace(nsName); @@ -100,7 +101,7 @@ public void testPythonPublishJavaConsume(String serviceUrl) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build(); @Cleanup diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java index 49a3f8aaa06dc..e02cb5829d96e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java @@ -49,8 +49,8 @@ public class JodaTimeTest extends PulsarTestSuite { private PulsarClient client; private PulsarAdmin admin; - @BeforeMethod - public void setup() throws Exception { + public void setupCluster() throws Exception { + super.setupCluster(); this.client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); @@ -59,6 +59,17 @@ public void setup() throws Exception { .build(); } + @Override + public void tearDownCluster() throws Exception { + if (client != null) { + client.close(); + } + if (admin != null) { + admin.close(); + } + super.tearDownCluster(); + } + @Data private static class JodaSchema { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java index 6194f9527dd86..df8dbc460153f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java @@ -54,8 +54,8 @@ public class SchemaTest extends PulsarTestSuite { private PulsarClient client; private PulsarAdmin admin; - @BeforeMethod - public void setup() throws Exception { + public void setupCluster() throws Exception { + super.setupCluster(); this.client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 37f39837200d4..d921618c3dc00 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -56,12 +57,12 @@ public class SemanticsTest extends PulsarTestSuite { // @Test(dataProvider = "ServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } @Test(dataProvider = "ServiceUrls") - public void testEffectivelyOnceDisabled(String serviceUrl) throws Exception { + public void testEffectivelyOnceDisabled(Supplier serviceUrl) throws Exception { String nsName = generateNamespaceName(); pulsarCluster.createNamespace(nsName); @@ -69,7 +70,7 @@ public void testEffectivelyOnceDisabled(String serviceUrl) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build(); @Cleanup @@ -131,7 +132,7 @@ private static void receiveAndAssertMessage(Consumer consumer, } @Test(dataProvider = "ServiceUrls") - public void testEffectivelyOnceEnabled(String serviceUrl) throws Exception { + public void testEffectivelyOnceEnabled(Supplier serviceUrl) throws Exception { String nsName = generateNamespaceName(); pulsarCluster.createNamespace(nsName); pulsarCluster.enableDeduplication(nsName, true); @@ -140,7 +141,7 @@ public void testEffectivelyOnceEnabled(String serviceUrl) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build(); @Cleanup @@ -234,7 +235,7 @@ private void testSubscriptionInitialPosition(int numTopics) throws Exception { } @Test(dataProvider = "ServiceUrls") - public void testBatchProducing(String serviceUrl) throws Exception { + public void testBatchProducing(Supplier serviceUrl) throws Exception { String topicName = generateTopicName("testbatchproducing", true); int numMessages = 10; @@ -242,7 +243,7 @@ public void testBatchProducing(String serviceUrl) throws Exception { List producedMsgIds; try (PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build()) { try (Consumer consumer = client.newConsumer(Schema.STRING) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java index 97c07ff9cb55e..5d142297a915f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/standalone/SmokeTest.java @@ -18,14 +18,15 @@ */ package org.apache.pulsar.tests.integration.standalone; +import java.util.function.Supplier; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.testng.annotations.Test; public class SmokeTest extends PulsarStandaloneTestSuite { @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java index 0cfcdb10464bc..8b0c67d4292bf 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java @@ -23,16 +23,36 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase { +public abstract class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase { + private final String imageName; + + protected PulsarStandaloneTestSuite() { + this(PulsarContainer.DEFAULT_IMAGE_NAME); + } + + protected PulsarStandaloneTestSuite(String imageName) { + this.imageName = imageName; + } - @BeforeClass public void setUpCluster() throws Exception { - super.startCluster(PulsarContainer.DEFAULT_IMAGE_NAME); + incrementSetupNumber(); + super.startCluster(imageName); } - @AfterClass(alwaysRun = true) public void tearDownCluster() throws Exception { + markCurrentSetupNumberCleaned(); super.stopCluster(); } + @BeforeClass(alwaysRun = true) + @Override + protected final void setup() throws Exception { + setUpCluster(); + } + + @AfterClass(alwaysRun = true) + @Override + protected final void cleanup() throws Exception { + tearDownCluster(); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index 5402d45eadbb3..4b13888c15bff 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -23,18 +23,16 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -public class PulsarTestSuite extends PulsarClusterTestBase { +public abstract class PulsarTestSuite extends PulsarClusterTestBase { - @BeforeClass - @Override - public void setupCluster() throws Exception { - super.setupCluster(); + @BeforeClass(alwaysRun = true) + public final void setupBeforeClass() throws Exception { + setup(); } @AfterClass(alwaysRun = true) - @Override - public void tearDownCluster() { - super.tearDownCluster(); + public final void tearDownAfterClass() throws Exception { + cleanup(); } public static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis) throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java index 1c61164426270..7811b38e0fd92 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java @@ -33,9 +33,9 @@ public abstract class PulsarTieredStorageTestSuite extends PulsarClusterTestBase protected static final int ENTRIES_PER_LEDGER = 1024; - @BeforeClass + @BeforeClass(alwaysRun = true) @Override - public void setupCluster() throws Exception { + public final void setupCluster() throws Exception { final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) .filter(s -> s != null && !s.isEmpty()) .collect(joining("-")); @@ -51,7 +51,7 @@ public void setupCluster() throws Exception { @AfterClass(alwaysRun = true) @Override - public void tearDownCluster() { + public final void tearDownCluster() throws Exception { super.tearDownCluster(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java index 3511c48a9308a..c38d74279e7ef 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java @@ -32,43 +32,51 @@ import static org.testng.Assert.assertNotNull; public class ClientTestBase { + private static final int RECEIVE_TIMEOUT_SECONDS = 3; public void resetCursorCompatibility(String serviceUrl, String serviceHttpUrl, String topicName) throws Exception { final String subName = "my-sub"; - @Cleanup - final PulsarClient pulsarClient = PulsarClient.builder() + @Cleanup final PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); - @Cleanup - final PulsarAdmin admin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin admin = PulsarAdmin.builder() .serviceHttpUrl(serviceHttpUrl) .build(); - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) - .enableBatching(false).topic(topicName).create(); - @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic(topicName).subscriptionName(subName).subscribe(); - for (int i = 0; i < 50; i++) { - producer.send("msg" + i); - } + Message lastMsg = null; - for (int i = 0; i < 10; i++) { - lastMsg = consumer.receive(); - assertNotNull(lastMsg); - consumer.acknowledge(lastMsg); + { + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subName).subscribe(); + for (int i = 0; i < 50; i++) { + producer.send("msg" + i); + } + for (int i = 0; i < 10; i++) { + lastMsg = consumer.receive(); + assertNotNull(lastMsg); + consumer.acknowledge(lastMsg); + } } admin.topics().resetCursor(topicName, subName, lastMsg.getMessageId()); - @Cleanup - Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe(); - Message message = consumer2.receive(1, TimeUnit.SECONDS); - assertEquals(message.getMessageId(), lastMsg.getMessageId()); + { + @Cleanup + Consumer consumer2 = + pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe(); + Message message = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertEquals(message.getMessageId(), lastMsg.getMessageId()); + } admin.topics().resetCursorAsync(topicName, subName, lastMsg.getMessageId()).get(3, TimeUnit.SECONDS); - @Cleanup - Consumer consumer3 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe(); - message = consumer3.receive(1, TimeUnit.SECONDS); - assertEquals(message.getMessageId(), lastMsg.getMessageId()); + { + @Cleanup + Consumer consumer3 = + pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe(); + Message message = consumer3.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertEquals(message.getMessageId(), lastMsg.getMessageId()); + } } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java index e322c706f4948..70bd7d6f844dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.topologies; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.testng.annotations.DataProvider; @@ -27,18 +28,27 @@ @Slf4j public abstract class PulsarClusterTestBase extends PulsarTestBase { + @Override + protected final void setup() throws Exception { + setupCluster(); + } + + @Override + protected final void cleanup() throws Exception { + tearDownCluster(); + } @DataProvider(name = "ServiceUrlAndTopics") public Object[][] serviceUrlAndTopics() { return new Object[][] { // plain text, persistent topic { - pulsarCluster.getPlainTextServiceUrl(), + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), true, }, // plain text, non-persistent topic { - pulsarCluster.getPlainTextServiceUrl(), + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), false } }; @@ -49,7 +59,7 @@ public Object[][] serviceUrls() { return new Object[][] { // plain text { - pulsarCluster.getPlainTextServiceUrl() + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()) } }; } @@ -59,14 +69,22 @@ public Object[][] serviceAndAdminUrls() { return new Object[][] { // plain text { - pulsarCluster.getPlainTextServiceUrl(), - pulsarCluster.getHttpServiceUrl() + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), + stringSupplier(() -> getPulsarCluster().getHttpServiceUrl()) } }; } protected PulsarCluster pulsarCluster; + public PulsarCluster getPulsarCluster() { + return pulsarCluster; + } + + private static Supplier stringSupplier(Supplier supplier) { + return supplier; + } + public void setupCluster() throws Exception { this.setupCluster(""); } @@ -93,6 +111,7 @@ protected void beforeStartCluster() throws Exception { } protected void setupCluster(PulsarClusterSpec spec) throws Exception { + incrementSetupNumber(); log.info("Setting up cluster {} with {} bookies, {} brokers", spec.clusterName(), spec.numBookies(), spec.numBrokers()); @@ -105,10 +124,10 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception { log.info("Cluster {} is setup", spec.clusterName()); } - public void tearDownCluster() { + public void tearDownCluster() throws Exception { + markCurrentSetupNumberCleaned(); if (null != pulsarCluster) { pulsarCluster.stop(); } } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java index c919001144c2d..f765afcae777f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java @@ -19,7 +19,7 @@ package org.apache.pulsar.tests.integration.topologies; import static org.testng.Assert.assertEquals; - +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.StandaloneContainer; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; @@ -42,12 +42,12 @@ public Object[][] serviceUrlAndTopics() { return new Object[][] { // plain text, persistent topic { - container.getPlainTextServiceUrl(), + stringSupplier(() -> getContainer().getPlainTextServiceUrl()), true, }, // plain text, non-persistent topic { - container.getPlainTextServiceUrl(), + stringSupplier(() -> getContainer().getPlainTextServiceUrl()), false } }; @@ -57,15 +57,24 @@ public Object[][] serviceUrlAndTopics() { public Object[][] serviceUrlAndHttpUrl() { return new Object[][] { { - container.getPlainTextServiceUrl(), - container.getHttpServiceUrl(), + stringSupplier(() -> getContainer().getPlainTextServiceUrl()), + stringSupplier(() -> getContainer().getHttpServiceUrl()), } }; } protected Network network; + protected StandaloneContainer container; + public StandaloneContainer getContainer() { + return container; + } + + private static Supplier stringSupplier(Supplier supplier) { + return supplier; + } + protected void startCluster(final String pulsarImageName) throws Exception { network = Network.newNetwork(); String clusterName = PulsarClusterTestBase.randomName(8); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java index 9730f0d32d5d6..9989b15faa95e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java @@ -19,13 +19,11 @@ package org.apache.pulsar.tests.integration.topologies; import static org.testng.Assert.assertEquals; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -34,9 +32,10 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; -public class PulsarTestBase { +public abstract class PulsarTestBase extends TestRetrySupport { public static String randomName(int numChars) { StringBuilder sb = new StringBuilder(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java index 41524097b41c9..558d4580d3563 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.transaction; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; @@ -72,9 +73,9 @@ public class TransactionTest extends TransactionTestBase { * 2. The balance update messages amount sum should be 0. */ @Test(dataProvider = "ServiceUrls") - public void transferNormalTest(String serviceUrl) throws Exception { + public void transferNormalTest(Supplier serviceUrl) throws Exception { log.info("transfer normal test start."); - PulsarClient pulsarClient = PulsarClient.builder().enableTransaction(true).serviceUrl(serviceUrl).build(); + PulsarClient pulsarClient = PulsarClient.builder().enableTransaction(true).serviceUrl(serviceUrl.get()).build(); final int transferCount = 20; final String transferTopic = "transfer-" + randomName(6); @@ -114,7 +115,7 @@ public void transferNormalTest(String serviceUrl) throws Exception { .subscribe(); log.info("balance update consumer create finished"); - while(true) { + while (true) { Message message = transferConsumer.receive(10, TimeUnit.SECONDS); if (message == null) { break; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTestBase.java index 5245d777d9a30..756cc50d8f6dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TransactionTestBase.java @@ -33,7 +33,7 @@ * Transaction test base. */ @Slf4j -public class TransactionTestBase extends PulsarTestSuite { +public abstract class TransactionTestBase extends PulsarTestSuite { @Override protected void beforeStartCluster() throws Exception { @@ -48,8 +48,7 @@ protected void beforeStartCluster() throws Exception { } } - @BeforeClass - public void transactionCoordinatorMetadataInitialize() throws Exception { + private void transactionCoordinatorMetadataInitialize() throws Exception { BrokerContainer brokerContainer = pulsarCluster.getBrokers().iterator().next(); ContainerExecResult result = brokerContainer.execCmd( "/pulsar/bin/pulsar", "initialize-transaction-coordinator-metadata", @@ -57,6 +56,12 @@ public void transactionCoordinatorMetadataInitialize() throws Exception { "-c", pulsarCluster.getClusterName()); } + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + transactionCoordinatorMetadataInitialize(); + } + public void prepareTransferData(Producer transferProducer, int messageCnt) { for (int i = 0; i < messageCnt; i++) { TransferOperation transferOperation = new TransferOperation(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java index 7526f3b85b6d6..ecdf8358ac375 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java @@ -21,6 +21,7 @@ import static java.util.stream.Collectors.joining; import static org.testng.Assert.assertEquals; import com.google.common.collect.ImmutableMap; +import java.util.function.Supplier; import java.util.stream.Stream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -44,9 +45,11 @@ public class PulsarZKDowngradeTest extends PulsarClusterTestBase { protected static final int ENTRIES_PER_LEDGER = 1024; - @BeforeClass + @BeforeClass(alwaysRun = true) @Override - public void setupCluster() throws Exception { + public final void setupCluster() throws Exception { + incrementSetupNumber(); + final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) .filter(s -> !s.isEmpty()) .collect(joining("-")); @@ -72,19 +75,19 @@ ImmutableMap. builder() @AfterClass(alwaysRun = true) @Override - public void tearDownCluster() { + public final void tearDownCluster() throws Exception { super.tearDownCluster(); } @Test(dataProvider = "ServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { + public void testPublishAndConsume(Supplier serviceUrl, boolean isPersistent) throws Exception { String topicName = generateTopicName("testpubconsume", isPersistent); int numMessages = 10; @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(serviceUrl) + .serviceUrl(serviceUrl.get()) .build(); @Cleanup diff --git a/tests/pulsar-client-admin-shade-test/pom.xml b/tests/pulsar-client-admin-shade-test/pom.xml index 7aba34a00cfd8..7d00ce2a8fd69 100644 --- a/tests/pulsar-client-admin-shade-test/pom.xml +++ b/tests/pulsar-client-admin-shade-test/pom.xml @@ -106,16 +106,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java index f5b24c95678b8..2cd355e7c4cdc 100644 --- a/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java +++ b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.tests.TestRetrySupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -58,15 +59,16 @@ import static org.testng.Assert.*; -public class SimpleProducerConsumerTest { +public class SimpleProducerConsumerTest extends TestRetrySupport { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); private PulsarContainer pulsarContainer; private URI lookupUrl; private PulsarClient pulsarClient; - @BeforeClass - public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + @Override + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); pulsarContainer = new PulsarContainer(); @@ -84,8 +86,9 @@ public void setup() throws PulsarClientException, URISyntaxException, PulsarAdmi admin.close(); } + @Override @AfterClass(alwaysRun = true) - public void cleanup() throws PulsarClientException { + public void cleanup() throws Exception { pulsarClient.close(); pulsarContainer.stop(); pulsarContainer.close(); diff --git a/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index b4a63b64ea3bd..d7c43f376286a 100644 --- a/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -37,12 +38,13 @@ import java.util.List; import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup() { pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -85,8 +87,9 @@ public void checkAdmin() throws PulsarClientException, PulsarAdminException { Assert.assertEquals(admin.namespaces().getNamespaces("public"), expectedNamespacesList); } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); } diff --git a/tests/pulsar-client-all-shade-test/pom.xml b/tests/pulsar-client-all-shade-test/pom.xml index 1419a6672826c..7808616cea8cd 100644 --- a/tests/pulsar-client-all-shade-test/pom.xml +++ b/tests/pulsar-client-all-shade-test/pom.xml @@ -105,16 +105,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java index 75bcea2bf9829..543e4750864fe 100644 --- a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java +++ b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.tests.TestRetrySupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -54,15 +55,16 @@ import static org.testng.Assert.*; import static org.testng.Assert.assertEquals; -public class SimpleProducerConsumerTest { +public class SimpleProducerConsumerTest extends TestRetrySupport { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); private PulsarContainer pulsarContainer; private URI lookupUrl; private PulsarClient pulsarClient; - @BeforeClass - public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + @Override + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); pulsarContainer = new PulsarContainer(); pulsarContainer.start(); @@ -79,8 +81,9 @@ public void setup() throws PulsarClientException, URISyntaxException, PulsarAdmi admin.close(); } + @Override @AfterClass(alwaysRun = true) - public void cleanup() throws PulsarClientException { + public void cleanup() throws Exception { pulsarClient.close(); pulsarContainer.stop(); pulsarContainer.close(); diff --git a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index b4a63b64ea3bd..478b60b3ba7fa 100644 --- a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -37,12 +38,13 @@ import java.util.List; import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup(){ pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -85,8 +87,9 @@ public void checkAdmin() throws PulsarClientException, PulsarAdminException { Assert.assertEquals(admin.namespaces().getNamespaces("public"), expectedNamespacesList); } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); } diff --git a/tests/pulsar-client-shade-test/pom.xml b/tests/pulsar-client-shade-test/pom.xml index 77b1b3ae96d55..40177ee7236bb 100644 --- a/tests/pulsar-client-shade-test/pom.xml +++ b/tests/pulsar-client-shade-test/pom.xml @@ -100,16 +100,6 @@ org.apache.maven.plugins maven-surefire-plugin - - - testRetryCount - 0 - - - listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener - - -Xmx2G -XX:MaxDirectMemorySize=8G -Dio.netty.leakDetectionLevel=advanced diff --git a/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java index 36cf421eb9812..a96db7adfc9ac 100644 --- a/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java +++ b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.tests.TestRetrySupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -58,15 +59,16 @@ import static org.testng.Assert.*; -public class SimpleProducerConsumerTest { +public class SimpleProducerConsumerTest extends TestRetrySupport { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); private PulsarContainer pulsarContainer; private URI lookupUrl; private PulsarClient pulsarClient; - @BeforeClass - public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + @Override + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); pulsarContainer = new PulsarContainer(); pulsarContainer.start(); @@ -83,8 +85,9 @@ public void setup() throws PulsarClientException, URISyntaxException, PulsarAdmi admin.close(); } + @Override @AfterClass(alwaysRun = true) - public void cleanup() throws PulsarClientException { + public void cleanup() throws Exception { pulsarClient.close(); pulsarContainer.stop(); pulsarContainer.close(); diff --git a/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java index d974ca7c3fb18..c8eb9cdd09a2c 100644 --- a/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java +++ b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SmokeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -33,12 +34,13 @@ import java.util.concurrent.TimeUnit; -public class SmokeTest { +public class SmokeTest extends TestRetrySupport { private PulsarContainer pulsarContainer; - @BeforeClass - public void setup(){ + @Override + @BeforeClass(alwaysRun = true) + public final void setup(){ pulsarContainer = new PulsarContainer(); pulsarContainer.start(); } @@ -72,8 +74,9 @@ public void checkClient() throws PulsarClientException { } + @Override @AfterClass(alwaysRun = true) - public void cleanup(){ + public final void cleanup(){ pulsarContainer.stop(); }