Skip to content

Commit

Permalink
Fix some ExecutorService leaks and use @cleanup("shutdownNow") for cl…
Browse files Browse the repository at this point in the history
…eanup (apache#10198)

- use ExecutorService.shutdownNow() instead of ExecutorService.shutdown() in tests
  • Loading branch information
lhotari authored Apr 14, 2021
1 parent b0f3ae0 commit 41814a7
Show file tree
Hide file tree
Showing 62 changed files with 362 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -708,6 +709,7 @@ void testConcurrentResetCursor() throws Exception {
final int Consumers = 5;

List<Future<AtomicBoolean>> futures = Lists.newArrayList();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);

Expand Down Expand Up @@ -1727,6 +1729,7 @@ void testReadEntriesOrWaitBlocking() throws Exception {
final int Consumers = 10;

List<Future<Void>> futures = Lists.newArrayList();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);

Expand Down Expand Up @@ -3465,7 +3468,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {

} finally {
factory2.shutdown();
}
}
});

factory1.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -1271,6 +1272,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
public void testConcurrentAsyncSetProperties() throws Exception {
final CountDownLatch latch = new CountDownLatch(1000);
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
final int finalI = i;
Expand Down Expand Up @@ -1302,7 +1304,6 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
fail(e.getMessage());
}
assertTrue(latch.await(300, TimeUnit.SECONDS));
executor.shutdown();
factory.shutdown();
}

Expand Down Expand Up @@ -1447,7 +1448,8 @@ public void testOpenRaceCondition() throws Exception {

final int N = 1000;
final Position position = ledger.addEntry("entry-0".getBytes());
Executor executor = Executors.newCachedThreadPool();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CountDownLatch counter = new CountDownLatch(2);
executor.execute(() -> {
try {
Expand Down Expand Up @@ -2290,6 +2292,7 @@ public void testLazyRecoverCursor() throws Exception {

// Simulating time consuming cursor recovery.
CompletableFuture<Void> future = bkc.promiseAfter(2);
@Cleanup("shutdownNow")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lazyCursorRecovery"));
scheduledExecutorService.schedule(() -> {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void tearDown() throws Exception {
stopBKCluster();
// stop zookeeper service
stopZKCluster();
executor.shutdown();
executor.shutdownNow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ public final void setUpClass() {
@AfterClass(alwaysRun = true)
public final void tearDownClass() {
if (executor != null) {
executor.shutdown();
executor.shutdownNow();
}
if (cachedExecutor != null) {
cachedExecutor.shutdown();
cachedExecutor.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.ILoggerFactory;
Expand All @@ -52,6 +53,7 @@ public void run() {
+ service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (transactionExecutor != null) {
transactionExecutor.shutdown();
transactionExecutor = null;
}

if (coordinationService != null) {
coordinationService.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public void close() {
mockedBk.close();
} catch (BKException | InterruptedException ignored) {
}
executor.shutdown();
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void createTenant(PulsarAdmin pulsarAdmin)
@AfterClass(alwaysRun = true)
public void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();
executor = null;

for (int i = 0; i < BROKER_COUNT; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}

@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newCachedThreadPool();

try {
Expand Down Expand Up @@ -1403,7 +1404,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
}

producer.close();
executorService.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}

@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newCachedThreadPool();


Expand Down Expand Up @@ -1083,7 +1084,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
}

producer.close();
executorService.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws Exception {

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception{
executor.shutdown();
executor.shutdownNow();
zkCache.stop();
zkc.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void setup() throws Exception {

@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
Expand Down Expand Up @@ -609,7 +609,7 @@ private BundlesData getBundles(int numBundles) {
private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception {
Policies policies = new Policies();
policies.bundles = getBundles(numBundles);
String zpath = AdminResource.path(POLICIES, namespace);
String zpath = AdminResource.path(POLICIES, namespace);
pulsar.getPulsarResources().getNamespaceResources().create(zpath, policies);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setup() throws Exception {

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
executor.shutdown();
executor.shutdownNow();
zkCache.stop();
zkc.close();
otherZkc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Excepti
retryStrategically((test) -> dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);

executor.shutdown();
executor.shutdownNow();
myConsumer.close();
producer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ public void testTopicFailureShouldNotHaveDeadLock() {
fail(e.getMessage());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
Expand All @@ -873,8 +874,6 @@ public void testTopicFailureShouldNotHaveDeadLock() {
fail("there is a dead-lock and it should have been prevented");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NullPointerException);
} finally {
executor.shutdownNow();
}
}

Expand All @@ -892,6 +891,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
fail(e.getMessage());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
Expand Down Expand Up @@ -926,7 +926,6 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), PersistenceException.class);
} finally {
executor.shutdownNow();
ledgers.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -146,6 +147,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
}

List<Consumer<byte[]>> successfulConsumers = Collections.synchronizedList(Lists.newArrayList());
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 20;
CountDownLatch latch = new CountDownLatch(totalConsumers);
Expand All @@ -170,7 +172,6 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
}
}
pulsarClient.close();
executor.shutdown();
assertNotEquals(successfulConsumers.size(), totalConsumers);
}

Expand Down Expand Up @@ -198,6 +199,7 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio
.ioThreads(20).connectionsPerBroker(20).build();
upsertLookupPermits(100);
List<Consumer<byte[]>> consumers = Collections.synchronizedList(Lists.newArrayList());
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 8;
CountDownLatch latch = new CountDownLatch(totalConsumers);
Expand Down Expand Up @@ -231,8 +233,6 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio

}
assertEquals(totalConnectedConsumers, totalConsumers);

executor.shutdown();
pulsarClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void concurrent() throws Exception {

CyclicBarrier barrier = new CyclicBarrier(Threads);
CountDownLatch counter = new CountDownLatch(Threads);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

List<String> results = Collections.synchronizedList(Lists.newArrayList());
Expand Down Expand Up @@ -112,8 +114,6 @@ public void concurrent() throws Exception {
// Check the list contains no duplicates
Set<String> set = Sets.newHashSet(results);
assertEquals(set.size(), results.size());

executor.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public void testConcurrentConsumerThreads() throws Exception {
final int recvQueueSize = 100;
final int numConsumersThreads = 10;

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
Expand Down Expand Up @@ -376,7 +377,6 @@ public Void call() throws Exception {
// 2. flow control works the same as single consumer single thread
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), recvQueueSize);
executor.shutdown();
}

@Test(enabled = false)
Expand All @@ -395,6 +395,7 @@ public void testGracefulClose() throws Exception {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(1);
executor.submit(() -> {
Expand Down Expand Up @@ -442,8 +443,6 @@ public void testGracefulClose() throws Exception {
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertTrue(subRef.getDispatcher().isConsumerConnected());

executor.shutdown();
}

@Test
Expand Down Expand Up @@ -1398,7 +1397,7 @@ public void testBrokerTopicStats() throws Exception {
field.setAccessible(true);
ScheduledExecutorService statsUpdater = (ScheduledExecutorService) field.get(brokerService);
// disable statsUpdate to calculate rates explicitly
statsUpdater.shutdown();
statsUpdater.shutdownNow();

final String namespace = "prop/ns-abc";
Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down
Loading

0 comments on commit 41814a7

Please sign in to comment.