Skip to content

Commit

Permalink
Correctly process the ack for a message that was outstanding on a dif…
Browse files Browse the repository at this point in the history
…ferent consumer (apache#791)

Fixes apache#759
  • Loading branch information
merlimat authored Sep 26, 2017
1 parent bace0b4 commit fea1b33
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.get(this) >= maxUnackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
Expand Down Expand Up @@ -424,13 +424,13 @@ public void updateRates() {

public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = UNACKED_MESSAGES_UPDATER.get(this);
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
return stats;
}

public int getUnackedMessages() {
return UNACKED_MESSAGES_UPDATER.get(this);
return unackedMessages;
}

@Override
Expand Down Expand Up @@ -499,7 +499,10 @@ private void removePendingAcks(PositionImpl position) {
// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnedConsumer != null) {
int totalAckedMsgs = (int) ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()).first;
ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId());
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
Expand Down Expand Up @@ -563,7 +566,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId,
totalRedeliveryMessages, pendingPositions.size());
}

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

Expand All @@ -584,7 +587,7 @@ public Subscription getSubscription() {

private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
subscription.addUnAckedMessages(ackedMessages);
return UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages);
return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
}

private void clearUnAckedMsgs(Consumer consumer) {
Expand All @@ -596,7 +599,7 @@ public static class SendMessageInfo {
ChannelPromise channelPromse;
int totalSentMessages;
long totalSentMessageBytes;

public ChannelPromise getChannelPromse() {
return channelPromse;
}
Expand All @@ -615,8 +618,8 @@ public long getTotalSentMessageBytes() {
public void setTotalSentMessageBytes(long totalSentMessageBytes) {
this.totalSentMessageBytes = totalSentMessageBytes;
}

}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void readMoreEntries() {
}
}
}

if (!messagesToReplay.isEmpty()) {
if (havePendingReplayRead) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
Expand Down Expand Up @@ -230,7 +230,7 @@ public void readMoreEntries() {
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
TOTAL_UNACKED_MESSAGES_UPDATER.get(this), maxUnackedMessages);
totalUnackedMessages, maxUnackedMessages);
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down Expand Up @@ -438,7 +438,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

}


/**
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
*
Expand Down Expand Up @@ -495,17 +495,16 @@ public void addUnAckedMessages(int numberOfMessages) {
log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", name,
TOTAL_UNACKED_MESSAGES_UPDATER.get(this), maxUnackedMessages);
} else if (topic.getBrokerService().isBrokerDispatchingBlocked()
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
&& blockedDispatcherOnUnackedMsgs == TRUE) {
// unblock dispatcher: if dispatcher is blocked due to broker-unackMsg limit and if it ack back enough
// messages
if (TOTAL_UNACKED_MESSAGES_UPDATER.get(this) < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) {
if (totalUnackedMessages < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) {
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// it removes dispatcher from blocked list and unblocks dispatcher by scheduling read
topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(this));
}
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE
&& unAckedMessages < maxUnackedMessages / 2) {
} else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages < maxUnackedMessages / 2) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
Expand All @@ -517,24 +516,24 @@ public void addUnAckedMessages(int numberOfMessages) {
}

public boolean isBlockedDispatcherOnUnackedMsgs() {
return BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE;
return blockedDispatcherOnUnackedMsgs == TRUE;
}

public void blockDispatcherOnUnackedMsgs() {
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, TRUE);
blockedDispatcherOnUnackedMsgs = TRUE;
}

public void unBlockDispatcherOnUnackedMsgs() {
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, FALSE);
blockedDispatcherOnUnackedMsgs = FALSE;
}

public int getTotalUnackedMessages() {
return TOTAL_UNACKED_MESSAGES_UPDATER.get(this);
return totalUnackedMessages;
}

public String getName() {
return name;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,8 +43,13 @@
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -515,4 +522,58 @@ public void testCancelReadRequestOnLastDisconnect() throws Exception {
assertEquals(receivedConsumer1, totalMessages);
}

@Test
public void testUnackedCountWithRedeliveries() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries";
final String subName = "sub3";
final int numMsgs = 10;

Producer producer = pulsarClient.createProducer(topicName);

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
conf.setReceiverQueueSize(0);
ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(topicName, subName, conf);

for (int i = 0; i < numMsgs; i++) {
producer.send(("hello-" + i).getBytes());
}

Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();

// C-1 gets all messages but doesn't ack
for (int i = 0; i < numMsgs; i++) {
c1_receivedMessages.add((MessageIdImpl) consumer1.receive().getMessageId());
}

// C-2 will not get any message initially, since everything went to C-1 already
Consumer consumer2 = pulsarClient.subscribe(topicName, subName, conf);

// Trigger C-1 to redeliver everything, half will go C-1 again and the other half to C-2
consumer1.redeliverUnacknowledgedMessages(c1_receivedMessages);

// Consumer 2 will also receive all message but not ack
for (int i = 0; i < numMsgs; i++) {
consumer2.receive();
}

for (MessageId msgId : c1_receivedMessages) {
consumer1.acknowledge(msgId);
}

PersistentTopicStats stats = admin.persistentTopics().getStats(topicName);

// Unacked messages count should be 0 for both consumers at this point
SubscriptionStats subStats = stats.subscriptions.get(subName);
assertEquals(subStats.msgBacklog, 0);

for (ConsumerStats cs : subStats.consumers) {
assertEquals(cs.unackedMessages, 0);
}

producer.close();
consumer1.close();
consumer2.close();
admin.persistentTopics().delete(topicName);
}
}

0 comments on commit fea1b33

Please sign in to comment.