Skip to content

Commit

Permalink
Add close method in the class of NegativeAcksTracker (apache#12469)
Browse files Browse the repository at this point in the history
  • Loading branch information
lordcheng10 authored Oct 31, 2021
1 parent 7f60bcd commit 3694aa1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ private void closeConsumerTasks() {
if (batchReceiveTimeout != null) {
batchReceiveTimeout.cancel();
}
negativeAcksTracker.close();
stats.getStatTimeout().ifPresent(Timeout::cancel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;

import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -30,7 +31,7 @@
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;

class NegativeAcksTracker {
class NegativeAcksTracker implements Closeable {

private HashMap<MessageId, Long> nackedMessages = null;

Expand Down Expand Up @@ -93,4 +94,17 @@ public synchronized void add(MessageId messageId) {
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
}

@Override
public synchronized void close() {
if (timeout != null && !timeout.isCancelled()) {
timeout.cancel();
timeout = null;
}

if (nackedMessages != null) {
nackedMessages.clear();
nackedMessages = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,18 @@ public void testBatchReceiveAsyncCanBeCancelled() {
// then
Assert.assertFalse(consumer.hasPendingBatchReceive());
}

@Test
public void testClose() {
Exception checkException = null;
try {
if (consumer != null) {
consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
consumer.close();
}
} catch (Exception e) {
checkException = e;
}
Assert.assertNull(checkException);
}
}

0 comments on commit 3694aa1

Please sign in to comment.