Skip to content

Commit

Permalink
Fixed race condition while triggering message redelivery after an ack…
Browse files Browse the repository at this point in the history
…-timeout event (apache#5276)
  • Loading branch information
merlimat authored Oct 7, 2019
1 parent e5ded1b commit e840375
Showing 1 changed file with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;

Expand All @@ -44,8 +43,8 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand All @@ -54,6 +53,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
}
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);

if (pendingAcks != null) {
AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
pendingAcks.forEach(
(ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize));
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get());
pendingAcks.clear();
List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
totalRedeliveryMessages.add((int) batchSize);
pendingPositions.add(new PositionImpl(ledgerId, entryId));
});

for (PositionImpl p : pendingPositions) {
pendingAcks.remove(p.getLedgerId(), p.getEntryId());
}

msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
} else {
subscription.redeliverUnacknowledgedMessages(this);
}

flowConsumerBlockedPermits(this);
}

public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {

int totalRedeliveryMessages = 0;
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
Expand Down

0 comments on commit e840375

Please sign in to comment.