Skip to content

Commit

Permalink
clear up pendingReceive queue when consumer close (apache#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jul 21, 2017
1 parent 2ab766a commit 22aee89
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -2127,5 +2129,39 @@ public void testRedeliveryFailOverConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);

}

@Test(timeOut = 5000)
public void testFailReceiveAsyncOnConsumerClose() throws Exception {
log.info("-- Starting {} test --", methodName);

// (1) simple consumers
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/failAsyncReceive",
"my-subscriber-name", new ConsumerConfiguration());
consumer.close();
// receive messages
try {
consumer.receiveAsync().get(1, TimeUnit.SECONDS);
fail("it should have failed because consumer is already closed");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}

// (2) Partitioned-consumer
int numPartitions = 4;
DestinationName dn = DestinationName.get("persistent://my-property/use/my-ns/failAsyncReceive");
admin.persistentTopics().createPartitionedTopic(dn.toString(), numPartitions);
Consumer partitionedConsumer = pulsarClient.subscribe(dn.toString(), "my-partitioned-subscriber",
new ConsumerConfiguration());
partitionedConsumer.close();
// receive messages
try {
partitionedConsumer.receiveAsync().get(1, TimeUnit.SECONDS);
fail("it should have failed because consumer is already closed");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}

log.info("-- Exiting {} test --", methodName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ public CompletableFuture<Void> closeAsync() {
unAckedMessageTracker.close();
closeFuture.complete(null);
client.cleanupConsumer(this);
// fail all pending-receive futures to notify application
failPendingReceive();
} else {
closeFuture.completeExceptionally(exception);
}
Expand All @@ -665,7 +667,26 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}

void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
private void failPendingReceive() {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message> receiveFuture = pendingReceives.poll();
if (receiveFuture != null) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
} else {
break;
}
}
}
} finally {
lock.readLock().unlock();
}
}

void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public CompletableFuture<Void> closeAsync() {
closeFuture.complete(null);
log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription);
client.cleanupConsumer(this);
// fail all pending-receive futures to notify application
failPendingReceive();
} else {
setState(State.Failed);
closeFuture.completeExceptionally(closeFail.get());
Expand All @@ -305,6 +307,25 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}

private void failPendingReceive() {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message> receiveFuture = pendingReceives.poll();
if (receiveFuture != null) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
} else {
break;
}
}
}
} finally {
lock.readLock().unlock();
}
}

@Override
public boolean isConnected() {
for (ConsumerImpl consumer : consumers) {
Expand Down

0 comments on commit 22aee89

Please sign in to comment.