Skip to content

Commit

Permalink
Fix zeroQueueConsumer using listener (apache#6106)
Browse files Browse the repository at this point in the history
### Motivation

Available permits of ZeroQueueConsuemer must be 1 or less, however ZeroQueueConsuemer using listener may be greater than 1.


### Modifications

If listener is processing message, ZeroQueueConsumer doesn't send permit when it reconnect to broker.


### Reproduction
1. ZeroQueueConsuemer using listener consume a topic.

2. Unload that topic( or restart a broker) when listener is processing message.

3.  ZeroQueueConsumer sends permit when it reconnect to broker.
https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L133

4. ZeroQueueConsumer also sends permit when finished processing message.
https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L163

5. Available permits become 2.
  • Loading branch information
hrsakai authored and sijie committed Jan 24, 2020
1 parent 9db9605 commit c09314c
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
private final Lock zeroQueueLock = new ReentrantLock();

private volatile boolean waitingOnReceiveForZeroQueueSize = false;
private volatile boolean waitingOnListenerForZeroQueueSize = false;

public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
Expand Down Expand Up @@ -131,7 +132,7 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
// or queue was not empty: send a flow command
if (waitingOnReceiveForZeroQueueSize
|| currentQueueSize > 0
|| listener != null) {
|| (listener != null && !waitingOnListenerForZeroQueueSize)) {
sendFlowPermitsToBroker(cnx, 1);
}
}
Expand All @@ -157,13 +158,15 @@ private void triggerZeroQueueSizeListener(final Message<T> message) {
log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription,
message.getMessageId());
}
waitingOnListenerForZeroQueueSize = true;
trackMessage(message);
listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message));
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,
message.getMessageId(), t);
}
increaseAvailablePermits(cnx());
waitingOnListenerForZeroQueueSize = false;
});
}

Expand Down

0 comments on commit c09314c

Please sign in to comment.