Skip to content

Commit

Permalink
[Issue 10010][Client] fixed memory leak (apache#10028)
Browse files Browse the repository at this point in the history
trying to close all the resources even if previous one fails during close/shutdown
code clean up as per the idea suggestions

Fixes apache#10010

### Motivation

Fixed memory leak caused by reception thrown from the PulsarClientImpl constructor in some cases. Resources were not getting closed which used to end up throwing OOM if user code tries to create a client unless succeeded.

### Modifications

Handled exception in the constructor with try-catch and updated shutdown method to close all resources correctly. Previously, it was not attempting to close further resource if the previous one failed. Now it will try to close all of them individully.

This change is a trivial rework / code cleanup without any test coverage.

### Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no): no
  - The public API: (yes / no): no
  - The schema: (yes / no / don't know): no
  - The default values of configurations: (yes / no): no
  - The wire protocol: (yes / no): no
  - The rest endpoints: (yes / no): no
  - The admin cli options: (yes / no): no
  - Anything that affects deployment: (yes / no / don't know): no

### Documentation

  - Does this pull request introduce a new feature? (yes / no): no
  - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented): not applicable
  - If a feature is not applicable for documentation, explain why?: nothing to concerned with the end-user
  - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation: not required
  • Loading branch information
abhilashmandaliya authored Apr 13, 2021
1 parent 187fb97 commit 197bd93
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,11 @@ public CompletableFuture<Void> closeAsync() {

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();

if (listenChannel != null) {
if (listenChannel != null && listenChannel.isOpen()) {
asyncCloseFutures.add(closeChannel(listenChannel));
}

if (listenChannelTls != null) {
if (listenChannelTls != null && listenChannelTls.isOpen()) {
asyncCloseFutures.add(closeChannel(listenChannelTls));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {

producer.flush();

Message<byte[]> msg = null;
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("Received: [{}]", new String(msg.getData()));
Expand Down Expand Up @@ -986,7 +986,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
EntryCacheImpl entryCache = spy((EntryCacheImpl) cacheField.get(ledger));
cacheField.set(ledger, entryCache);

Message<byte[]> msg = null;
Message<byte[]> msg;
// 2. Produce messages
for (int i = 0; i < 30; i++) {
String message = "my-message-" + i;
Expand Down Expand Up @@ -1082,7 +1082,7 @@ public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) throws Ex
final long maxMessageCacheRetentionTimeMillis = conf.getManagedLedgerCacheEvictionTimeThresholdMillis();
final long maxActiveCursorBacklogEntries = conf.getManagedLedgerCursorBackloggedThreshold();

Message<byte[]> msg = null;
Message<byte[]> msg;
final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1;
// 2. Produce messages
for (int i = 0; i < totalMsgs; i++) {
Expand Down Expand Up @@ -1216,7 +1216,7 @@ public void testSendCallBackReturnSequenceId() throws Exception {

// Trigger the send timeout
stopBroker();
List<CompletableFuture<MessageId>> futures = new ArrayList<CompletableFuture<MessageId>>();
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for(int i = 0 ; i < 3 ; i++) {
CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync();
futures.add(future);
Expand Down Expand Up @@ -1291,7 +1291,7 @@ public void testSharedConsumerAckDifferentConsumer(boolean ackReceiptEnabled) th
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Message<byte[]> msg;
Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
Expand Down Expand Up @@ -1396,7 +1396,7 @@ public void testConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) {
}

// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1442,10 +1442,9 @@ public void testConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) {
* due to reaching ack-threshold (500) 3. Consumer acks messages after stop getting messages 4. Consumer again tries
* to consume messages 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
*
* @throws Exception
*/
@Test(dataProvider = "ackReceiptEnabled")
public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ackReceiptEnabled) throws Exception {
public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand Down Expand Up @@ -1476,7 +1475,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ack
// (2) Receive Messages
for (int j = 0; j < totalReceiveIteration; j++) {

Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1552,7 +1551,7 @@ public void testMultipleSharedConsumerBlockingWithUnActedMessages(boolean ackRec

// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1702,7 +1701,7 @@ public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled) {
}

// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -1866,7 +1865,7 @@ public void testBlockUnackConsumerAckByDifferentConsumer() {

// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2007,9 +2006,8 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()

// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), unAckedMessagesBufferSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m ->
(MessageIdImpl) m.getMessageId()).collect(Collectors.toSet());

// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Expand Down Expand Up @@ -2081,7 +2079,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages1 = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand All @@ -2094,9 +2092,8 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}

// client should not receive all produced messages and should be blocked due to unack-messages
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m ->
(MessageIdImpl) m.getMessageId()).collect(Collectors.toSet());

// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Expand Down Expand Up @@ -2207,7 +2204,7 @@ public void testPriorityConsumer() throws Exception {
*
* @throws Exception
*/
@Test(timeOut = 10000)
@Test(timeOut = 30000)
public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
final int queueSize = 5;
Expand Down Expand Up @@ -2359,7 +2356,7 @@ public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) throws Exc
Thread.sleep(10);
}
// (1.a) consume first consumeMsgInParts msgs and trigger redeliver
Message<byte[]> msg = null;
Message<byte[]> msg;
List<Message<byte[]>> messages1 = Lists.newArrayList();
for (int i = 0; i < consumeMsgInParts; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2515,7 +2512,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
cryptoProducer.send(message.getBytes());
}

Message<byte[]> msg = null;
Message<byte[]> msg;

msg = normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
// should not able to read message using normal message.
Expand Down Expand Up @@ -2600,7 +2597,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
producer2.send(message.getBytes());
}

MessageImpl<byte[]> msg = null;
MessageImpl<byte[]> msg;

msg = (MessageImpl<byte[]>) normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
// should not able to read message using normal message.
Expand Down Expand Up @@ -2902,7 +2899,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe

final int totalMsg = 10;

MessageImpl<byte[]> msg = null;
MessageImpl<byte[]> msg;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
Expand Down Expand Up @@ -3080,7 +3077,7 @@ private String decryptMessage(TopicMessageImpl<byte[]> msg,

ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
// try to decrypt use default MessageCryptoBc
MessageCrypto crypto = new MessageCryptoBc("test", false);
@SuppressWarnings("rawtypes") MessageCrypto crypto = new MessageCryptoBc("test", false);

MessageMetadata messageMetadata = new MessageMetadata()
.setEncryptionParam(encrParam)
Expand Down Expand Up @@ -3219,7 +3216,7 @@ public void testMultiTopicsConsumerImplPause() throws Exception {
}
assertEquals(counter, 10);

producer.close();;
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
Expand Down Expand Up @@ -3299,6 +3296,7 @@ public void testFlushBatchDisabled() throws Exception {

// Issue 1452: https://github.com/apache/pulsar/issues/1452
// reachedEndOfTopic should be called only once if a topic has been terminated before subscription
@SuppressWarnings("rawtypes")
@Test
public void testReachedEndOfTopic() throws Exception
{
Expand All @@ -3311,7 +3309,7 @@ public void testReachedEndOfTopic() throws Exception
admin.topics().terminateTopicAsync(topicName).get();

CountDownLatch latch = new CountDownLatch(2);
Consumer consumer = pulsarClient.newConsumer()
@SuppressWarnings("unchecked") Consumer consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("my-subscriber-name")
.messageListener(new MessageListener()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,20 @@ public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress addres
}

void closeAllConnections() {
pool.values().forEach(map -> {
map.values().forEach(future -> {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// Connection was already created successfully, the join will not throw any exception
future.join().close();
} else {
// If the future already failed, there's nothing we have to do
}
pool.values().forEach(map -> map.values().forEach(future -> {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// Connection was already created successfully, the join will not throw any exception
future.join().close();
} else {
// The future is still pending: just register to make sure it gets closed if the operation will
// succeed
future.thenAccept(ClientCnx::close);
// If the future already failed, there's nothing we have to do
}
});
});
} else {
// The future is still pending: just register to make sure it gets closed if the operation will
// succeed
future.thenAccept(ClientCnx::close);
}
}));
}

/**
Expand Down Expand Up @@ -164,7 +162,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
log.debug("Connection for {} not found in cache", logicalAddress);
}

final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(physicalAddress).thenAccept(channel -> {
Expand Down Expand Up @@ -228,7 +226,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
CompletableFuture<List<InetAddress>> resolvedAddress = null;
CompletableFuture<List<InetAddress>> resolvedAddress;
try {
if (isSniProxy) {
URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
Expand All @@ -255,15 +253,11 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(channel -> {
// Successfully connected to server
future.complete(channel);
}).exceptionally(exception -> {
// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(channel -> {
future.complete(channel);
}).exceptionally(ex -> {
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
return null;
Expand Down Expand Up @@ -321,7 +315,9 @@ public void releaseConnection(ClientCnx cnx) {
@Override
public void close() throws IOException {
try {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
if (!eventLoopGroup.isShutdown()) {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
}
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}
Expand Down
Loading

0 comments on commit 197bd93

Please sign in to comment.