Skip to content

Commit

Permalink
Enable listener to receive messages even if receiver queue size is ze…
Browse files Browse the repository at this point in the history
…ro (apache#1977)

* Enable listener to receive messages even if receiver queue size is zero

* Send flow if acked message is received
  • Loading branch information
massakam authored and merlimat committed Jun 19, 2018
1 parent 4ed4233 commit c293ae3
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ logs
/data
pulsar-broker/tmp.*
pulsar-broker/src/test/resources/log4j2.yaml
pulsar-functions/worker/test-tenant/
*.log

*.versionsBackup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import static org.testng.Assert.assertEquals;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -122,6 +126,50 @@ public void zeroQueueSizeNormalConsumer() throws PulsarClientException {
}
}

@Test()
public void zeroQueueSizeConsumerListener() throws Exception {
String key = "zeroQueueSizeConsumerListener";

// 1. Config
final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";

// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

// 3. Create Consumer
List<Message<byte[]>> messages = Lists.newArrayList();
CountDownLatch latch = new CountDownLatch(totalMessages);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(0).messageListener((cons, msg) -> {
assertEquals(((ConsumerImpl) cons).numMessagesInQueue(), 0);
synchronized(messages) {
messages.add(msg);
}
log.info("Consumer received: " + new String(msg.getData()));
latch.countDown();
}).subscribe();

// 3. producer publish messages
for (int i = 0; i < totalMessages; i++) {
String message = messagePredicate + i;
log.info("Producer produced: " + message);
producer.send(message.getBytes());
}

// 4. Receiver receives the message
latch.await();
assertEquals(consumer.numMessagesInQueue(), 0);
assertEquals(messages.size(), totalMessages);
for (int i = 0; i < messages.size(); i++) {
assertEquals(new String(messages.get(i).getData()), messagePredicate + i);
}
}

@Test()
public void zeroQueueSizeSharedSubscription() throws PulsarClientException {
String key = "zeroQueueSizeSharedSubscription";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ public void connectionOpened(final ClientCnx cnx) {
AVAILABLE_PERMITS_UPDATER.set(this, 0);
// For zerosize queue : If the connection is reset and someone is waiting for the messages
// or queue was not empty: send a flow command
if (waitingOnReceiveForZeroQueueSize || (conf.getReceiverQueueSize() == 0 && currentSize > 0)) {
if (waitingOnReceiveForZeroQueueSize
|| (conf.getReceiverQueueSize() == 0 && (currentSize > 0 || listener != null))) {
sendFlowPermitsToBroker(cnx, 1);
}
} else {
Expand Down Expand Up @@ -678,6 +679,9 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC
log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, msgId);
}
if (conf.getReceiverQueueSize() == 0) {
increaseAvailablePermits(cnx);
}
return;
}

Expand Down Expand Up @@ -722,12 +726,12 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
boolean asyncReceivedWaiting = !pendingReceives.isEmpty();
if ((conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) && !asyncReceivedWaiting) {
incomingMessages.add(message);
}
if (asyncReceivedWaiting) {
if (!pendingReceives.isEmpty()) {
notifyPendingReceivedCallback(message, null);
} else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) {
incomingMessages.add(message);
} else if (conf.getReceiverQueueSize() == 0 && listener != null) {
triggerZeroQueueSizeListener(message);
}
} finally {
lock.readLock().unlock();
Expand All @@ -754,7 +758,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC
msgMetadata.recycle();
}

if (listener != null) {
if (listener != null && conf.getReceiverQueueSize() != 0) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
listenerExecutor.execute(() -> {
Expand Down Expand Up @@ -816,6 +820,27 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception
}
}

private void triggerZeroQueueSizeListener(final Message<T> message) {
checkArgument(conf.getReceiverQueueSize() == 0);
checkNotNull(listener, "listener can't be null");
checkNotNull(message, "unqueued message can't be null");

listenerExecutor.execute(() -> {
stats.updateNumMsgsReceived(message);
try {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription,
message.getMessageId());
}
listener.received(ConsumerImpl.this, message);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,
message.getMessageId(), t);
}
increaseAvailablePermits(cnx());
});
}

void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx) {
int batchSize = msgMetadata.getNumMessagesInBatch();
Expand Down

0 comments on commit c293ae3

Please sign in to comment.