diff --git a/.github/workflows/ci-unit-broker-broker-gp1.yaml b/.github/workflows/ci-unit-broker-broker-gp1.yaml
index b72211833b890..c4fd56c2d27a4 100644
--- a/.github/workflows/ci-unit-broker-broker-gp1.yaml
+++ b/.github/workflows/ci-unit-broker-broker-gp1.yaml
@@ -34,7 +34,7 @@ jobs:
unit-tests:
name:
runs-on: ubuntu-latest
- timeout-minutes: 45
+ timeout-minutes: 60
steps:
- name: checkout
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh BROKER_GROUP_1
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit-broker-broker-gp2.yaml b/.github/workflows/ci-unit-broker-broker-gp2.yaml
index fc81b76c96232..402dc1b594e4f 100644
--- a/.github/workflows/ci-unit-broker-broker-gp2.yaml
+++ b/.github/workflows/ci-unit-broker-broker-gp2.yaml
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh BROKER_GROUP_2
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit-broker-client-api.yaml b/.github/workflows/ci-unit-broker-client-api.yaml
index e5ba11dbeeb43..c2c6a0b4758d4 100644
--- a/.github/workflows/ci-unit-broker-client-api.yaml
+++ b/.github/workflows/ci-unit-broker-client-api.yaml
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh BROKER_CLIENT_API
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit-broker-client-impl.yaml b/.github/workflows/ci-unit-broker-client-impl.yaml
index c2e81a9be1680..a26be7ba1ca9e 100644
--- a/.github/workflows/ci-unit-broker-client-impl.yaml
+++ b/.github/workflows/ci-unit-broker-client-impl.yaml
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh BROKER_CLIENT_IMPL
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit-broker-other.yaml b/.github/workflows/ci-unit-broker-other.yaml
index 1876e072599fc..023d2b330364b 100644
--- a/.github/workflows/ci-unit-broker-other.yaml
+++ b/.github/workflows/ci-unit-broker-other.yaml
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh BROKER_FLAKY
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-proxy.yaml
index aac3ceaa3fcda..de2db845c33ec 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-proxy.yaml
@@ -84,6 +84,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh PROXY
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/.github/workflows/ci-unit.yaml b/.github/workflows/ci-unit.yaml
index 43fe23c6c4ea9..8c5d9c3785692 100644
--- a/.github/workflows/ci-unit.yaml
+++ b/.github/workflows/ci-unit.yaml
@@ -80,6 +80,10 @@ jobs:
if: ${{ steps.changes.outputs.all_count > steps.changes.outputs.docs_count }}
run: ./build/run_unit_group.sh OTHER
+ - name: print JVM thread dumps when cancelled
+ if: cancelled()
+ run: ./build/pulsar_ci_tool.sh print_thread_dumps
+
- name: package surefire artifacts
if: failure()
run: |
diff --git a/bouncy-castle/bcfips-include-test/src/test/java/org/apache/pulsar/client/TlsProducerConsumerBase.java b/bouncy-castle/bcfips-include-test/src/test/java/org/apache/pulsar/client/TlsProducerConsumerBase.java
index 197a5cc2f64e6..35216719b93ed 100644
--- a/bouncy-castle/bcfips-include-test/src/test/java/org/apache/pulsar/client/TlsProducerConsumerBase.java
+++ b/bouncy-castle/bcfips-include-test/src/test/java/org/apache/pulsar/client/TlsProducerConsumerBase.java
@@ -44,7 +44,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private final String clusterName = "use";
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
// TLS configuration for Broker
diff --git a/build/pulsar_ci_tool.sh b/build/pulsar_ci_tool.sh
new file mode 100755
index 0000000000000..12dfc4bfcc47a
--- /dev/null
+++ b/build/pulsar_ci_tool.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# shell function library for Pulsar CI builds
+
+# lists all available functions in this tool
+function ci_list_functions() {
+ declare -F | awk '{print $NF}' | sort | egrep '^ci_' | sed 's/^ci_//'
+}
+
+# prints thread dumps for all running JVMs
+# used in CI when a job gets cancelled because of a job timeout
+function ci_print_thread_dumps() {
+ for java_pid in $(jps -q -J-XX:+PerfDisableSharedMem); do
+ echo "----------------------- pid $java_pid -----------------------"
+ cat /proc/$java_pid/cmdline | xargs -0 echo
+ jcmd $java_pid Thread.print -l
+ jcmd $java_pid GC.heap_info
+ done
+ return 0
+}
+
+if [ -z "$1" ]; then
+ echo "usage: $0 [ci_tool_function_name]"
+ echo "Available ci tool functions:"
+ ci_list_functions
+ exit 1
+fi
+ci_function_name="ci_$1"
+shift
+
+if [[ "$(LC_ALL=C type -t $ci_function_name)" == "function" ]]; then
+ eval "$ci_function_name" "$@"
+else
+ echo "Invalid ci tool function"
+ echo "Available ci tool functions:"
+ ci_list_functions
+ exit 1
+fi
\ No newline at end of file
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 344b789d82aa2..83b3f2361eacb 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -19,11 +19,20 @@
#
set -e
-set -x
set -o pipefail
set -o errexit
-MVN_TEST_COMMAND='build/retry.sh mvn -B -ntp test'
+# solution for printing output in "set -x" trace mode without tracing the echo calls
+shopt -s expand_aliases
+echo_and_restore_trace() {
+ builtin echo "$@"
+ [ $trace_enabled -eq 1 ] && set -x || true
+}
+alias echo='{ [[ $- =~ .*x.* ]] && trace_enabled=1 || trace_enabled=0; set +x; } 2> /dev/null; echo_and_restore_trace'
+
+MVN_COMMAND='mvn -B -ntp'
+MVN_COMMAND_WITH_RETRY="build/retry.sh ${MVN_COMMAND}"
+MVN_TEST_COMMAND="${MVN_COMMAND_WITH_RETRY} test"
echo -n "Test Group : $TEST_GROUP"
@@ -44,16 +53,64 @@ function broker_client_impl() {
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='broker-impl'
}
+# prints summaries of failed tests to console
+# by using the targer/surefire-reports files
+# works only when testForkCount > 1 since that is when surefire will create reports for individual test classes
+function print_testng_failures() {
+ (
+ { set +x; } 2>/dev/null
+ local testng_failed_file="$1"
+ local report_prefix="${2:-Test failure in}"
+ local group_title="${3:-Detailed test failures}"
+ if [ -f "$testng_failed_file" ]; then
+ local testng_report_dir=$(dirname "$testng_failed_file")
+ local failed_count=0
+ for failed_test_class in $(cat "$testng_failed_file" | grep 'class name=' | perl -p -e 's/.*\"(.*?)\".*/$1/'); do
+ ((failed_count += 1))
+ if [ $failed_count -eq 1 ]; then
+ echo "::endgroup::"
+ echo "::group::${group_title}"
+ fi
+ local test_report_file="${testng_report_dir}/${failed_test_class}.txt"
+ if [ -f "${test_report_file}" ]; then
+ local test_report="$(cat "${test_report_file}" | egrep "^Tests run: " | perl -p -se 's/^(Tests run: .*) <<< FAILURE! - in (.*)$/::warning::$report_prefix $2 - $1/' -- -report_prefix="${report_prefix}")"
+ echo "$test_report"
+ cat "${test_report_file}"
+ fi
+ done
+ fi
+ )
+}
+
function broker_flaky() {
+ echo "::endgroup::"
+ echo "::group::Running quarantined tests"
+ $MVN_COMMAND test -pl pulsar-broker -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false \
+ -DtestForkCount=2 -Dexclude='**/Replicator*Test.java' ||
+ print_testng_failures pulsar-broker/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined test failures"
+ # run quarantined Replicator tests separately
+ $MVN_COMMAND test -pl pulsar-broker -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false \
+ -DtestForkCount=1 -DtestReuseFork=false -Dinclude='**/Replicator*Test.java' || \
+ print_testng_failures pulsar-broker/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined Replicator*Test failures"
+ echo "::endgroup::"
+ echo "::group::Running flaky tests"
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='flaky' -DtestForkCount=1 -DtestReuseFork=false
+ echo "::endgroup::"
}
function proxy() {
+ echo "::endgroup::"
+ echo "::group::Running quarantined pulsar-proxy tests"
+ $MVN_COMMAND test -pl pulsar-proxy -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false ||
+ print_testng_failures pulsar-proxy/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined test failures"
+ echo "::endgroup::"
+ echo "::group::Running pulsar-proxy tests"
$MVN_TEST_COMMAND -pl pulsar-proxy
+ echo "::endgroup::"
}
function other() {
- build/retry.sh mvn -B -ntp install -PbrokerSkipTest \
+ $MVN_COMMAND_WITH_RETRY clean install -PbrokerSkipTest \
-Dexclude='org/apache/pulsar/proxy/**/*.java,
**/ManagedLedgerTest.java,
**/TestPulsarKeyValueSchemaHandler.java,
@@ -61,25 +118,34 @@ function other() {
BlobStoreManagedLedgerOffloaderTest.java'
$MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
- **/OffloadersCacheTest.java' \
- -DtestForkCount=1 \
- -DtestReuseFork=true
+ **/OffloadersCacheTest.java'
- $MVN_TEST_COMMAND -pl pulsar-sql/presto-pulsar-plugin -Dinclude='**/TestPulsarKeyValueSchemaHandler.java' \
- -DtestForkCount=1
+ $MVN_TEST_COMMAND -pl pulsar-sql/presto-pulsar-plugin -Dinclude='**/TestPulsarKeyValueSchemaHandler.java'
- $MVN_TEST_COMMAND -pl pulsar-client -Dinclude='**/PrimitiveSchemaTest.java' \
- -DtestForkCount=1
+ $MVN_TEST_COMMAND -pl pulsar-client -Dinclude='**/PrimitiveSchemaTest.java'
- $MVN_TEST_COMMAND -pl tiered-storage/jcloud -Dinclude='**/BlobStoreManagedLedgerOffloaderTest.java' \
- -DtestForkCount=1
+ $MVN_TEST_COMMAND -pl tiered-storage/jcloud -Dinclude='**/BlobStoreManagedLedgerOffloaderTest.java'
+
+ echo "::endgroup::"
+ local modules_with_quarantined_tests=$(git grep -l '@Test.*"quarantine"' | grep '/src/test/java/' | \
+ awk -F '/src/test/java/' '{ print $1 }' | egrep -v 'pulsar-broker|pulsar-proxy' | sort | uniq | \
+ perl -0777 -p -e 's/\n(\S)/,$1/g')
+ if [ -n "${modules_with_quarantined_tests}" ]; then
+ echo "::group::Running quarantined tests outside of pulsar-broker & pulsar-proxy (if any)"
+ $MVN_COMMAND -pl "${modules_with_quarantined_tests}" test -Dgroups='quarantine' -DexcludedGroups='' \
+ -DfailIfNoTests=false || \
+ echo "::warning::There were test failures in the 'quarantine' test group."
+ echo "::endgroup::"
+ fi
}
# Test Groups -- end --
TEST_GROUP=$1
-echo -n "Test Group : $TEST_GROUP"
+echo "Test Group : $TEST_GROUP"
+
+set -x
case $TEST_GROUP in
diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java b/buildtools/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java
index 6a00087b64136..c4b9fc6dff38c 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java
@@ -40,7 +40,7 @@ public abstract class TestRetrySupport {
private int failedSetupNumber = -1;
private int cleanedUpSetupNumber;
- @BeforeMethod(groups = { "setup", "flaky", "extra" })
+ @BeforeMethod(alwaysRun = true)
public final void stateCheck(Method method) throws Exception {
// run cleanup and setup if the current setup number is the one where a failure happened
// this is to cleanup state before retrying
@@ -59,7 +59,7 @@ public final void stateCheck(Method method) throws Exception {
}
}
- @AfterMethod(alwaysRun = true, groups = { "setup", "flaky", "extra" })
+ @AfterMethod(alwaysRun = true)
public final void failureCheck(ITestResult testResult, Method method) {
// track the setup number where the failure happened
if (!testResult.isSuccess()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index a8bf44b55502e..c7572bd50cdc3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -48,9 +48,8 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
ManagedLedgerImpl ml1;
ManagedLedgerImpl ml2;
- @BeforeMethod
- public void setup(Method method) throws Exception {
- super.setUp(method);
+ @Override
+ protected void setUpTestCase() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
ml1 = mock(ManagedLedgerImpl.class);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index 537bab3fcf4cd..d64ef31e49a58 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -52,9 +52,8 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
private ManagedLedgerImpl ml;
- @BeforeMethod
- public void setUp(Method method) throws Exception {
- super.setUp(method);
+ @Override
+ protected void setUpTestCase() throws Exception {
ml = mock(ManagedLedgerImpl.class);
when(ml.getName()).thenReturn("name");
when(ml.getExecutor()).thenReturn(executor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 0497811e49647..ff0120310aa74 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -93,7 +93,7 @@ public BookKeeperClusterTestCase(int numBookies) {
this.numBookies = numBookies;
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
// enable zookeeper `zookeeper.4lw.commands.whitelist`
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
@@ -211,7 +211,7 @@ protected ServerConfiguration newServerConfiguration(String ledgerRootPath) thro
tmpDirs.add(f);
f.delete();
f.mkdir();
-
+
int port = 0;
return newServerConfiguration(port, zkUtil.getZooKeeperConnectString(), f, new File[] { f }, ledgerRootPath);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index b16a021e7ae90..a2ad8459472b3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -36,7 +36,6 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
/**
* A class runs several bookie servers for testing.
@@ -66,8 +65,8 @@ public MockedBookKeeperTestCase(int numBookies) {
this.numBookies = numBookies;
}
- @BeforeMethod(groups = { "broker" })
- public void setUp(Method method) throws Exception {
+ @BeforeMethod(alwaysRun = true)
+ public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
try {
// start bookkeeper service
@@ -81,10 +80,20 @@ public void setUp(Method method) throws Exception {
factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ setUpTestCase();
}
- @AfterMethod(alwaysRun = true, groups = { "broker" })
- public void tearDown(Method method) {
+ protected void setUpTestCase() throws Exception {
+
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public final void tearDown(Method method) {
+ try {
+ cleanUpTestCase();
+ } catch (Exception e) {
+ LOG.error("tearDown Error", e);
+ }
try {
LOG.info("@@@@@@@@@ stopping " + method);
factory.shutdown();
@@ -97,14 +106,18 @@ public void tearDown(Method method) {
}
}
- @BeforeClass(groups = { "broker" })
- public void setUpClass() {
+ protected void cleanUpTestCase() throws Exception {
+
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public final void setUpClass() {
executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
cachedExecutor = Executors.newCachedThreadPool();
}
- @AfterClass(alwaysRun = true, groups = { "broker" })
- public void tearDownClass() {
+ @AfterClass(alwaysRun = true)
+ public final void tearDownClass() {
if (executor != null) {
executor.shutdown();
}
diff --git a/pom.xml b/pom.xml
index c08aee0aa8c4d..105a17c2dbdab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,10 +81,10 @@ flexible messaging model and an intuitive client API.
1.8
- *
+ **/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java
-
+ quarantine
UTF-8
UTF-8
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 3034fb7186366..52be0fb40ccb6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -53,12 +53,7 @@
public class SLAMonitoringTest {
LocalBookkeeperEnsemble bkEnsemble;
- ExecutorService executor =
- new ThreadPoolExecutor(5,
- 20,
- 30,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>());
+ ExecutorService executor;
private static final int BROKER_COUNT = 5;
private final int[] brokerWebServicePorts = new int[BROKER_COUNT];
@@ -68,8 +63,14 @@ public class SLAMonitoringTest {
private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
private final ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT];
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
void setup() throws Exception {
+ executor =
+ new ThreadPoolExecutor(5,
+ 20,
+ 30,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>());
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
@@ -126,6 +127,7 @@ private void createTenant(PulsarAdmin pulsarAdmin)
public void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
+ executor = null;
for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index a89f88e12bb75..ed6db6a8ec30b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -99,6 +99,10 @@ public MockedPulsarServiceBaseTest() {
resetConfig();
}
+ protected PulsarService getPulsar() {
+ return pulsar;
+ }
+
protected final void resetConfig() {
this.conf = getDefaultConf();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 070be00066e46..f14abdee8c5e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -113,9 +113,18 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
try {
- admin.close();
- pulsar.close();
- bkEnsemble.stop();
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (pulsar != null) {
+ pulsar.close();
+ pulsar = null;
+ }
+ if (bkEnsemble != null) {
+ bkEnsemble.stop();
+ bkEnsemble = null;
+ }
} catch (Throwable t) {
LOG.error("Error cleaning up broker test setup state", t);
fail("Broker test cleanup failed");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 2e3b5cb19ebe1..c3e2fe28a6e5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -25,6 +25,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -38,7 +39,7 @@
* Test base for tests requires a bk ensemble.
*/
@Slf4j
-public abstract class BkEnsemblesTestBase {
+public abstract class BkEnsemblesTestBase extends TestRetrySupport {
protected PulsarService pulsar;
protected ServiceConfiguration config;
@@ -61,8 +62,10 @@ protected void configurePulsar(ServiceConfiguration config) {
//overridable by subclasses
}
- @BeforeMethod(groups = {"broker-impl", "broker"})
+ @Override
+ @BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
+ incrementSetupNumber();
try {
// start local bookie and zookeeper
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
@@ -98,8 +101,10 @@ protected void setup() throws Exception {
}
}
- @AfterMethod(alwaysRun = true, groups = {"broker-impl", "broker"})
- protected void shutdown() throws Exception {
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ markCurrentSetupNumberCleaned();
admin.close();
pulsar.close();
bkEnsemble.stop();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 452220350e1a3..4830efbb6861f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -33,10 +33,6 @@
public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
protected static final int ASYNC_EVENT_COMPLETION_WAIT = 100;
- protected PulsarService getPulsar() {
- return pulsar;
- }
-
public void baseSetup() throws Exception {
super.internalSetup();
baseSetupCommon();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java
index 0060050c2c1ba..3af044fd62e4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java
@@ -43,13 +43,13 @@ public class DistributedIdGeneratorTest {
private MetadataStoreExtended store;
private CoordinationService coordinationService;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
store = MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build());
coordinationService = new CoordinationServiceImpl(store);
}
- @AfterMethod(alwaysRun = true, groups = "broker")
+ @AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
coordinationService.close();
store.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index e290f729bb00a..1543d3e384469 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -49,7 +49,7 @@
public class NonPersistentTopicE2ETest extends BrokerTestBase {
- @BeforeMethod(groups = "broker")
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 4beb566a4701d..2e55e3fa58dcd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -93,7 +93,7 @@
@Test(groups = "flaky")
public class PersistentTopicE2ETest extends BrokerTestBase {
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
@@ -1429,7 +1429,7 @@ public void testBrokerTopicStats() throws Exception {
assertTrue(msgInRate > 0);
}
- @Test
+ @Test(groups = "quarantine")
public void testBrokerConnectionStats() throws Exception {
BrokerService brokerService = this.pulsar.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 560187e351109..d7cc9de39617b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -29,7 +29,6 @@
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -51,7 +50,7 @@ public RackAwareTest() {
super(0);
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
super.setup();
@@ -81,8 +80,8 @@ protected void setup() throws Exception {
}
@AfterClass(alwaysRun = true)
- protected void shutdown() throws Exception {
- super.shutdown();
+ protected void cleanup() throws Exception {
+ super.cleanup();
for (BookieServer bs : bookies) {
bs.shutdown();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index 4e7e3e365a92f..cb8be77208a69 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -35,7 +35,7 @@
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
-@Test(groups = "broker")
+@Test(groups = "quarantine")
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
protected String methodName;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index ac0c12cfb7f2c..a7c3aa85cbdfb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -46,7 +46,7 @@
/**
* Starts 3 brokers that are in 3 different clusters
*/
-@Test(groups = "broker")
+@Test(groups = "quarantine")
public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
protected String methodName;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9ef42d9cc59cf..7a48adbe03a36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -90,12 +90,12 @@
/**
* Starts 3 brokers that are in 3 different clusters
*/
-@Test(groups = "flaky")
+@Test(groups = "quarantine")
public class ReplicatorTest extends ReplicatorTestBase {
protected String methodName;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
admin1.namespaces().removeBacklogQuota("pulsar/ns");
@@ -104,7 +104,7 @@ public void beforeMethod(Method m) throws Exception {
}
@Override
- @BeforeClass(timeOut = 300000)
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index ea1fb42d273ea..ee9377ba3491e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -54,7 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReplicatorTestBase extends TestRetrySupport {
+public abstract class ReplicatorTestBase extends TestRetrySupport {
URL url1;
URL urlTls1;
ServiceConfiguration config1 = new ServiceConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index b21d2eef290a3..e2cf04c8ed7af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -52,7 +52,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
private NamespaceEventsSystemTopicFactory systemTopicFactory;
private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setSystemTopicEnabled(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
index 6bbde2d3ea8e0..b7a119fdddea8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -28,8 +28,8 @@
*/
@Test(groups = "broker")
public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends PersistentDispatcherFailoverConsumerTest {
-
- @BeforeMethod
+
+ @BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
super.setup();
pulsar.getConfiguration().setStreamingDispatch(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
index 2a3a30f5fed2c..5bf85f708022d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherE2ETest.java
@@ -29,10 +29,9 @@
@Test(groups = "flaky")
public class PersistentTopicStreamingDispatcherE2ETest extends PersistentTopicE2ETest {
- @BeforeMethod
@Override
- protected void setup() throws Exception {
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
conf.setStreamingDispatch(true);
- super.baseSetup();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index fc6cf045236a0..2663f94b79ad0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -50,7 +50,7 @@
@Test(groups = "broker")
public class PersistentTopicTest extends BrokerTestBase {
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
@@ -64,7 +64,7 @@ protected void cleanup() throws Exception {
/**
* Test validates that broker cleans up topic which failed to unload while bundle unloading.
- *
+ *
* @throws Exception
*/
@Test
@@ -96,7 +96,7 @@ public void testCleanFailedUnloadTopic() throws Exception {
/**
* Test validates if topic's dispatcher is stuck then broker can doscover and unblock it.
- *
+ *
* @throws Exception
*/
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index 42692356bd3a1..67b16f86614e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Optional;
+import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
@@ -64,7 +65,7 @@ private static class MyClass {
public int age;
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -96,16 +97,20 @@ protected void cleanup() throws Exception {
@DataProvider(name = "serviceUrl")
public Object[] serviceUrls() {
- return new String[] {
- pulsar.getBrokerServiceUrl(),
- pulsar.getWebServiceAddress()
+ return new Object[] {
+ stringSupplier(() -> getPulsar().getBrokerServiceUrl()),
+ stringSupplier(() -> getPulsar().getWebServiceAddress())
};
}
+ private static Supplier stringSupplier(Supplier supplier) {
+ return supplier;
+ }
+
@Test(dataProvider = "serviceUrl")
- public void testGetSchema(String serviceUrl) throws Exception {
+ public void testGetSchema(Supplier serviceUrl) throws Exception {
@Cleanup
- PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
assertEquals(client.getSchema("non-existing-topic").join(), Optional.empty());
assertEquals(client.getSchema(topicBytes).join(), Optional.empty());
@@ -117,7 +122,7 @@ public void testGetSchema(String serviceUrl) throws Exception {
/**
* It validates if schema ledger is deleted or non recoverable then it will clean up schema storage for the topic
* and make the topic available.
- *
+ *
* @throws Exception
*/
@Test
@@ -141,7 +146,7 @@ public void testSchemaFailure() throws Exception {
BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage();
long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
- // (2) break schema locator by deleting schema-ledger
+ // (2) break schema locator by deleting schema-ledger
schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
admin.topics().unload(fqtnOne);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
index 597ed0c89888b..e9b33cb5408da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReaderTests.java
@@ -91,19 +91,8 @@ public class StreamingEntryReaderTests extends MockedBookKeeperTestCase {
private ManagedLedgerImpl ledger;
private ManagedCursor cursor;
- @BeforeClass
- public void setUpClass() {
- super.setUpClass();
- }
-
- @AfterClass(alwaysRun = true)
- public void tearDownClass() {
- super.tearDownClass();
- }
-
- @BeforeMethod
- public void setup(Method method) throws Exception {
- super.setUp(method);
+ @Override
+ protected void setUpTestCase() throws Exception {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
orderedExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(1)
@@ -125,9 +114,16 @@ public Void answer(InvocationOnMock invocationOnMock) {
}).when(mockDispatcher).notifyConsumersEndOfTopic();
}
- @AfterMethod(alwaysRun = true)
- public void tearDown(Method method) {
- super.tearDown(method);
+ @Override
+ protected void cleanUpTestCase() {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ scheduledExecutorService = null;
+ }
+ if (orderedExecutor != null) {
+ orderedExecutor.shutdownNow();
+ orderedExecutor = null;
+ }
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index efef796df045e..a09ada2f911d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -33,10 +33,10 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-@Test(groups = "broker")
+@Test(groups = "quarantine")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -48,7 +48,6 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
- @Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 07ccf3e280c16..f1dd27fe6ebcb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -61,7 +61,7 @@ public class TransactionConsumeTest extends TransactionTestBase {
private final static String NORMAL_MSG_CONTENT = "Normal - ";
private final static String TXN_MSG_CONTENT = "Txn - ";
- @BeforeMethod(groups = "broker")
+ @BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setBrokerCount(1);
super.internalSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index cee3bc1dad0d7..8868c807ff931 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -50,6 +50,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
@@ -58,7 +59,7 @@
import org.apache.zookeeper.data.ACL;
@Slf4j
-public class TransactionTestBase {
+public abstract class TransactionTestBase extends TestRetrySupport {
public final static String CLUSTER_NAME = "test";
@@ -79,6 +80,7 @@ public class TransactionTestBase {
private NonClosableMockBookKeeper mockBookKeeper;
public void internalSetup() throws Exception {
+ incrementSetupNumber();
init();
admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
@@ -222,6 +224,7 @@ public void close() {
};
protected final void internalCleanup() {
+ markCurrentSetupNumberCleaned();
try {
// if init fails, some of these could be null, and if so would throw
// an NPE in shutdown, obscuring the real error
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 7d5f6a99359ee..1d1a828a53d59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -37,19 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import org.testng.annotations.AfterClass;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-
@Test(groups = "broker")
public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
@@ -59,9 +55,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
int partitions = 10;
BrokerService[] brokerServices;
- @BeforeClass
- void init() throws Exception {
- super.setup();
+ @Override
+ protected void afterSetup() throws Exception {
pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
pulsarAdmins[0].tenants().createTenant("public", new TenantInfo(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
pulsarAdmins[0].namespaces().createNamespace("public/test", 10);
@@ -74,8 +69,8 @@ void init() throws Exception {
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")));
}
- @AfterClass(alwaysRun = true)
- public void shutdownClient() throws Exception {
+ @Override
+ protected void cleanup() throws Exception {
if (tbClient != null) {
tbClient.close();
}
@@ -85,11 +80,11 @@ public void shutdownClient() throws Exception {
}
brokerServices = null;
}
+ super.cleanup();
}
@Override
- public void afterPulsarStart() throws Exception {
- super.afterPulsarStart();
+ protected void afterPulsarStart() throws Exception {
brokerServices = new BrokerService[pulsarServices.length];
for (int i = 0; i < pulsarServices.length; i++) {
Subscription mockSubscription = Mockito.mock(Subscription.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 2942b76cf5fcb..d5e68e0b6c9e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -79,7 +79,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
private final static String NAMESPACE1 = TENANT + "/ns1";
private final static String TOPIC = NAMESPACE1 + "/test-topic";
- @BeforeMethod(groups = "broker")
+ @BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
setBrokerCount(1);
internalSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index 200cf6c13ce80..eab297df86af1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -20,8 +20,9 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-
import com.google.common.collect.Lists;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
@@ -32,19 +33,13 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.mockito.Mockito;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
-
@Test(groups = "broker")
public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBase {
- @BeforeClass
- public void init() throws Exception {
- super.setup();
-
+ @Override
+ protected void afterSetup() throws Exception {
for (PulsarService pulsarService : pulsarServices) {
TransactionBufferClient tbClient = Mockito.mock(TransactionBufferClientImpl.class);
Mockito.when(tbClient.commitTxnOnTopic(anyString(), anyLong(), anyLong(), anyLong()))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 14b5529843551..cc0e185c582ba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -24,17 +24,11 @@
import org.apache.pulsar.broker.PulsarService;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionMetaStoreAssignmentTest extends TransactionMetaStoreTestBase {
- @BeforeClass(groups = "broker")
- public void init() throws Exception {
- super.setup();
- }
-
@Test(groups = "broker")
public void testTransactionMetaStoreAssignAndFailover() throws IOException {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 584bbda1d5f77..c4532a6e9d779 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.transaction.coordinator;
import java.util.Optional;
-
import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -27,13 +26,15 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
-public class TransactionMetaStoreTestBase {
+public abstract class TransactionMetaStoreTestBase extends TestRetrySupport {
private static final Logger log = LoggerFactory.getLogger(TransactionMetaStoreTestBase.class);
@@ -46,8 +47,9 @@ public class TransactionMetaStoreTestBase {
protected TransactionCoordinatorClient transactionCoordinatorClient;
- protected void setup() throws Exception {
- log.info("---- Initializing SLAMonitoringTest -----");
+ @BeforeClass(alwaysRun = true)
+ protected final void setup() throws Exception {
+ log.info("---- Initializing {} -----", getClass().getSimpleName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
@@ -92,19 +94,26 @@ protected void setup() throws Exception {
transactionCoordinatorClient.start();
Thread.sleep(3000);
+
+ afterSetup();
}
- public void afterPulsarStart() throws Exception {
- log.info("[afterPulsarStart]");
+ protected void afterSetup() throws Exception {
+ // template methods to override in subclasses
+ }
+
+
+ protected void afterPulsarStart() throws Exception {
+ // template methods to override in subclasses
}
@AfterClass(alwaysRun = true)
- public void shutdownAll() throws Exception {
- for (PulsarService service : pulsarServices) {
- if (service != null) {
- service.close();
- }
- }
+ public final void shutdownAll() throws Exception {
+ cleanup();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
for (PulsarAdmin admin : pulsarAdmins) {
if (admin != null) {
admin.close();
@@ -113,5 +122,11 @@ public void shutdownAll() throws Exception {
if (pulsarClient != null) {
pulsarClient.close();
}
+ for (PulsarService service : pulsarServices) {
+ if (service != null) {
+ service.close();
+ }
+ }
+ Mockito.reset();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
index 17b3a1303f8cc..6bf84b3a4dcb6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
@@ -35,7 +35,7 @@
@Test(groups = "broker")
public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest {
- @BeforeMethod(groups = "broker")
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 99d7b1d868fcf..b65f206b710a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -56,7 +56,7 @@
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "broker-api")
+@Test(groups = "quarantine")
public class ClientDeduplicationFailureTest {
LocalBookkeeperEnsemble bkEnsemble;
@@ -69,7 +69,7 @@ public class ClientDeduplicationFailureTest {
final String tenant = "external-repl-prop";
String primaryHost;
- @BeforeMethod(timeOut = 300000)
+ @BeforeMethod(timeOut = 300000, alwaysRun = true)
void setup(Method method) throws Exception {
log.info("--- Setting up method {} ---", method.getName());
@@ -180,7 +180,7 @@ public long getLastSeqId() {
}
}
- @Test(timeOut = 300000)
+ @Test(timeOut = 300000, groups = "quarantine")
public void testClientDeduplicationCorrectnessWithFailure() throws Exception {
final String namespacePortion = "dedup";
final String replNamespace = tenant + "/" + namespacePortion;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 5936632ee5ba6..07d9a1bb8821f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -49,15 +49,17 @@ public class ClientErrorsTest {
private final String ASSERTION_ERROR = "AssertionError";
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
public void setup() {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
}
- @AfterClass(alwaysRun = true, groups = "broker-api")
+ @AfterClass(alwaysRun = true)
public void teardown() {
- mockBrokerService.stop();
+ if (mockBrokerService != null) {
+ mockBrokerService.stop();
+ }
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index b1025e79c4a1f..fcc208441de41 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -32,10 +32,10 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
-@Test(groups = "flaky")
+@Test(groups = "quarantine")
public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 06f6fafa75732..18679de19e35d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -45,7 +45,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -58,7 +58,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
- @Test
+ @Test(groups = "quarantine")
public void testDeadLetterTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
@@ -336,7 +336,7 @@ public void testDeadLetterTopicWithMultiTopic() throws Exception {
checkConsumer.close();
}
- @Test
+ @Test(groups = "quarantine")
public void testDeadLetterTopicByCustomTopicName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final int maxRedeliveryCount = 2;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d24991a36aa1d..b715b1028c71e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -70,14 +70,14 @@
public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
- @BeforeMethod(groups = { "broker" })
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
- @AfterMethod(alwaysRun = true, groups = { "broker" })
+ @AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 812be9d086260..99e54954b24fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -93,7 +93,7 @@ public Object[][] dataProvider() {
};
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -630,7 +630,7 @@ public void testReadAheadWhenAddingConsumers() throws Exception {
assertTrue(readPosition.getEntryId() < 1000);
}
- @Test
+ @Test(groups = "quarantine")
public void testRemoveFirstConsumer() throws Exception {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
@@ -885,7 +885,7 @@ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception {
Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
}
- @Test(dataProvider = "partitioned")
+ @Test(dataProvider = "partitioned", groups = "quarantine")
public void testOrderingWithConsumerListener(boolean partitioned) throws Exception {
final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
if (partitioned) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 008df504bb9b6..0dc5a1f6c4453 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -182,7 +182,7 @@ public AuthenticationState newAuthState(AuthData authData,
}
}
- @BeforeMethod(groups = "broker-api")
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
mutualAuth = new MutualAuthentication();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index 1e1ea7bed56fe..27e3131335c5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -33,7 +33,7 @@
public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest {
protected String methodName;
- @BeforeMethod(groups = { "broker", "websocket", "broker-api", "broker-discovery", "broker-impl", "extra", "flaky" })
+ @BeforeMethod(alwaysRun = true)
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index 1d5a23b2c8a63..375b53f10ad6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -48,7 +48,7 @@ public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
//
private String host;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.executorService = Executors.newFixedThreadPool(1);
@@ -70,7 +70,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.listenerName("internal");
}
- @Test
+ @Test(groups = "quarantine")
public void testFindBrokerWithListenerName() throws Throwable {
admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
index c81f18014e86a..9118f5ec5e5f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
@@ -48,7 +48,7 @@ public class PulsarMultiListenersWithoutInternalListenerNameTest extends MockedP
//
private String host;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.executorService = Executors.newFixedThreadPool(1);
@@ -69,7 +69,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.listenerName("internal");
}
- @Test
+ @Test(groups = "quarantine")
public void testFindBrokerWithListenerName() throws Throwable {
admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e3dc969d0b8d1..3250eea2ecfa3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -112,7 +112,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final int RECEIVE_TIMEOUT_SHORT_MILLIS = 100;
private static final int RECEIVE_TIMEOUT_MEDIUM_MILLIS = 500;
- @BeforeMethod(groups = { "broker", "flaky" })
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -140,7 +140,7 @@ public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}
- @AfterMethod(alwaysRun = true, groups = { "broker" })
+ @AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -740,7 +740,7 @@ public void testSillyUser() {
// This is to test that the flow control counter doesn't get corrupted while concurrent receives during
// reconnections
- @Test(dataProvider = "batch")
+ @Test(dataProvider = "batch", groups = "quarantine")
public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
final int recvQueueSize = 100;
final int numConsumersThreads = 10;
@@ -2332,7 +2332,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}
- @Test(dataProvider = "ackReceiptEnabled")
+ @Test(dataProvider = "ackReceiptEnabled", groups = "quarantine")
public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -2715,7 +2715,7 @@ public void testDefaultCryptoKeyReader() throws Exception {
consumer4.unsubscribe();
}
- @Test
+ @Test(groups = "quarantine")
public void testRedeliveryOfFailedMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index 03bb77083a46c..d2a5eef230ad9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -63,7 +63,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
// Credentials File, which contains "client_id" and "client_secret"
private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
@@ -109,7 +109,7 @@ protected final void clientSetup() throws Exception {
.authentication(authentication));
}
- @AfterMethod(alwaysRun = true, groups = "broker-api")
+ @AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java
index 7a607218900b6..1119a3e76529b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java
@@ -32,7 +32,7 @@
public abstract class V1_ProducerConsumerBase extends MockedPulsarServiceBaseTest {
protected String methodName;
- @BeforeMethod(groups = {"broker-api, websocket"})
+ @BeforeMethod(alwaysRun = true)
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 48beb4600cac6..2dc34a1bdc7f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -92,7 +92,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -361,7 +361,7 @@ public void testInvalidSequence() throws Exception {
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
}
-
+
Consumer consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic6")
.subscriptionName("my-subscriber-name")
@@ -630,7 +630,7 @@ public void testSendBigMessageSize() throws Exception {
*
* @throws Exception
*/
- @Test
+ @Test(groups = "quarantine")
public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
index 70ed1eca87360..75cd1df20fcb9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerUnsubscribeTest.java
@@ -32,15 +32,18 @@ public class ConsumerUnsubscribeTest {
MockBrokerService mockBrokerService;
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
public void setup() {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
}
- @AfterClass
+ @AfterClass(alwaysRun = true)
public void teardown() {
- mockBrokerService.stop();
+ if (mockBrokerService != null) {
+ mockBrokerService.stop();
+ mockBrokerService = null;
+ }
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 11d44e6b24d84..a85dc842dfac0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -62,7 +62,7 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "reader-multi-topics-sub";
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -91,7 +91,7 @@ public void testReadMessageWithoutBatching() throws Exception {
testReadMessages(topic, false);
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 10000, groups = "quarantine")
public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-inclusive";
int topicNum = 3;
@@ -116,7 +116,7 @@ public void testReadMessageWithBatching() throws Exception {
testReadMessages(topic, true);
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 10000, groups = "quarantine")
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
int topicNum = 3;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index eb38a5616d1b5..a6c44b3c059a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -19,13 +19,11 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
-
import java.util.Collections;
-
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
-import org.apache.pulsar.broker.service.BrokerBkEnsemblesTests;
+import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -34,8 +32,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.Test;
-@Test(groups = "broker-impl")
-public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests {
+@Test(groups = "quarantine")
+public class SequenceIdWithErrorTest extends BkEnsemblesTestBase {
/**
* Test that sequence id from a producer is correct when there are send errors
@@ -78,14 +76,4 @@ public void testCheckSequenceId() throws Exception {
client.close();
}
-
- @Test(enabled = false)
- public void testCrashBrokerWithoutCursorLedgerLeak() {
- // Ignore test
- }
-
- @Test(enabled = false)
- public void testSkipCorruptDataLedger() {
- // Ignore test
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
index fac10acc04b9e..17abe7fcd4e15 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
@@ -33,6 +33,7 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.BoundType;
@@ -41,7 +42,12 @@
@Test(groups = "broker-naming")
public class NamespaceBundleTest {
- private final NamespaceBundleFactory factory = getNamespaceBundleFactory();
+ private NamespaceBundleFactory factory;
+
+ @BeforeClass(alwaysRun = true)
+ protected void initializeFactory() {
+ factory = getNamespaceBundleFactory();
+ }
@Test
public void testConstructor() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
index 32da8e746dece..ba526b7cf3683 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
@@ -41,6 +41,7 @@
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
@@ -52,7 +53,12 @@
@Test(groups = "broker-naming")
public class NamespaceBundlesTest {
- private final NamespaceBundleFactory factory = getNamespaceBundleFactory();
+ private NamespaceBundleFactory factory;
+
+ @BeforeMethod(alwaysRun = true)
+ protected void initializeFactory() {
+ factory = getNamespaceBundleFactory();
+ }
@SuppressWarnings("unchecked")
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 49752809502b7..47f0504520f59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -161,7 +161,7 @@ public Object[][] validRoleName() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
void loadPulsarApiExamples() throws MalformedURLException, ClassNotFoundException {
pulsarApiExamplesClassLoader = new URLClassLoader(new URL[]{getPulsarApiExamplesJar().toURI().toURL()},
Thread.currentThread().getContextClassLoader());
@@ -176,7 +176,7 @@ void closeClassLoader() throws IOException {
}
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
void setup(Method method) throws Exception {
log.info("--- Setting up method {} ---", method.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 87086579bcf26..74c31d85e5849 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -84,7 +84,7 @@ public class PulsarFunctionAdminTest {
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionAdminTest.class);
- @BeforeMethod(groups = "broker")
+ @BeforeMethod(alwaysRun = true)
void setup(Method method) throws Exception {
log.info("--- Setting up method {} ---", method.getName());
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 5fc084e7c5b8e..f3ccfd86e80b9 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -39,18 +39,20 @@
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ConsumerImplTest {
- private final ExecutorProvider executorProvider = new ExecutorProvider(1,"ConsumerImplTest");
+ private ExecutorProvider executorProvider;
private ConsumerImpl consumer;
private ConsumerConfigurationData consumerConf;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp() {
+ executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
ClientConfigurationData clientConf = client.getConfiguration();
@@ -66,6 +68,14 @@ public void setUp() {
consumer.setState(HandlerState.State.Ready);
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (executorProvider != null) {
+ executorProvider.shutdownNow();
+ executorProvider = null;
+ }
+ }
+
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
consumer.notifyPendingReceivedCallback(null, null);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 5b6641c8c41df..f02d75c2f8358 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -70,7 +70,7 @@ public class PulsarClientImplTest {
private PulsarClientImpl clientImpl;
private EventLoopGroup eventLoopGroup;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setup() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
@@ -83,10 +83,16 @@ private void initializeEventLoopGroup(ClientConfigurationData conf) {
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
}
- @AfterMethod
+ @AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
- clientImpl.close();
- eventLoopGroup.shutdownGracefully().get();
+ if (clientImpl != null) {
+ clientImpl.close();
+ clientImpl = null;
+ }
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully().get();
+ eventLoopGroup = null;
+ }
}
@Test
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
index 6e4360c57de9c..784fe8a493f0c 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -56,7 +56,7 @@ public abstract class AbstractFileTests {
protected Path directory;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void init() throws IOException {
// Create the directory we are going to read from
directory = Files.createTempDirectory("pulsar-io-file-tests", getPermissions());
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
index b04d4026bbf9b..674d04a47b69d 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestAbstractZooKeeperConfigurationProvider.java
@@ -52,7 +52,7 @@ public abstract class TestAbstractZooKeeperConfigurationProvider {
protected TestingServer zkServer;
protected CuratorFramework client;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
// start the instance without the admin server!
InstanceSpec serverSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, -1, Collections.singletonMap("zookeeper.admin.enableServer", "false"));
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
index 178808413404c..0a747479478ef 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/node/TestApplication.java
@@ -46,14 +46,17 @@ public class TestApplication {
private File baseDir;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
baseDir = Files.createTempDir();
}
- @AfterMethod
+ @AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
- FileUtils.deleteDirectory(baseDir);
+ if (baseDir != null) {
+ FileUtils.deleteDirectory(baseDir);
+ baseDir = null;
+ }
}
private T mockLifeCycle(Class klass) {
diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
index 6f630be059784..dbe158909be98 100644
--- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
@@ -51,7 +51,7 @@ public abstract class AbstractHdfsSinkTest {
protected HdfsAbstractSink sink;
@SuppressWarnings("unchecked")
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public final void setUp() throws Exception {
map = new HashMap ();
map.put("hdfsConfigResources", "../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml,"
diff --git a/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
index c8d343e3cde57..5fd16796a8b5e 100644
--- a/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
@@ -51,7 +51,7 @@ public abstract class AbstractHdfsSinkTest {
protected HdfsAbstractSink sink;
@SuppressWarnings("unchecked")
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public final void setUp() throws Exception {
map = new HashMap ();
map.put("hdfsConfigResources", "../pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml,"
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 3f7ea6ef2b393..49c916adaba51 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -29,7 +29,7 @@
public abstract class BaseMetadataStoreTest {
protected TestZKServer zks;
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
void setup() throws Exception {
zks = new TestZKServer();
}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index eb28056b9d32a..8a3b5cb51024b 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -92,7 +92,7 @@ public BookKeeperClusterTestCase(int numBookies) {
this.numBookies = numBookies;
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
executor = Executors.newCachedThreadPool();
InMemoryMetaStore.reset();
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
index 9a09584f9151f..8ff94dfec079c 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -68,7 +68,7 @@ public MockedBookKeeperTestCase(int numBookies) {
this.numBookies = numBookies;
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
try {
@@ -99,7 +99,7 @@ public void tearDown(Method method) {
}
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
public void setUpClass() {
executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
cachedExecutor = Executors.newCachedThreadPool();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index f7f3fff34c037..fa3c485335581 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -47,7 +47,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@Override
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
internalSetup();
@@ -70,10 +70,12 @@ protected void setup() throws Exception {
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
- proxyService.close();
+ if (proxyService != null) {
+ proxyService.close();
+ }
}
- @Test
+ @Test(groups = "quarantine")
public void testLookup() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index 6f830c0844437..ac9b5e7080056 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -68,7 +68,7 @@ public MockedBookKeeperTestCase(int numBookies) {
this.numBookies = numBookies;
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
try {
@@ -99,7 +99,7 @@ public void tearDown(Method method) {
}
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
public void setUpClass() {
executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
cachedExecutor = Executors.newCachedThreadPool();
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index fb7202b0823b1..05227665bdae2 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -74,7 +74,7 @@ public class ZookeeperCacheTest {
private OrderedScheduler executor;
private ScheduledExecutorService scheduledExecutor;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
void setup() throws Exception {
zkClient = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
}
@@ -84,7 +84,7 @@ void teardown() throws Exception {
zkClient.shutdown();
}
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
void classSetup() throws Exception {
executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("ZookeeperCacheTest").build();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 8214bcd0be19b..78a32b033cefd 100644
--- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -31,14 +31,14 @@
import java.nio.file.Files;
import java.util.Properties;
-public class FileStoreTestBase {
+public abstract class FileStoreTestBase {
protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
protected OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
protected final String basePath = "pulsar";
private MiniDFSCluster hdfsCluster;
private String hdfsURI;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void start() throws Exception {
File baseDir = Files.createTempDirectory(basePath).toFile().getAbsoluteFile();
Configuration conf = new Configuration();
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java
index 72eb4a59a215b..b8461bd21796f 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java
@@ -28,7 +28,7 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
-public class BlobStoreTestBase {
+public abstract class BlobStoreTestBase {
private static final Logger log = LoggerFactory.getLogger(BlobStoreTestBase.class);
public final static String BUCKET = "pulsar-unittest";
@@ -36,7 +36,7 @@ public class BlobStoreTestBase {
protected BlobStoreContext context = null;
protected BlobStore blobStore = null;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void start() throws Exception {
if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
log.info("TestReal AWS S3, bucket: {}", BUCKET);