Skip to content

Commit

Permalink
[broker] Fixed delayed delivery after read operation error (apache#18098
Browse files Browse the repository at this point in the history
)
  • Loading branch information
merlimat authored Oct 19, 2022
1 parent 3da7b9f commit 7d6dc2e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
"totalAvailablePermits");
protected volatile int totalAvailablePermits = 0;
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
protected final Backoff readFailureBackoff;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -141,6 +140,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -811,7 +814,10 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
if (!havePendingRead) {
// If it's a replay read we need to retry even if there's already
// another scheduled read, otherwise we'd be stuck until
// more messages are published.
if (!havePendingRead || readType == ReadType.Replay) {
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
Expand Down Expand Up @@ -1019,7 +1025,10 @@ protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesT
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
Set<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
return Collections.emptySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import lombok.Cleanup;

import org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -59,6 +60,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
@BeforeClass
public void setup() throws Exception {
conf.setDelayedDeliveryTickTimeMillis(1024);
conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -580,4 +582,47 @@ public void testInterleavedMessagesOnKeySharedSubscription() throws Exception {
}
}

@Test
public void testDispatcherReadFailure() throws Exception {
String topic = BrokerTestUtil.newUniqueName("testDispatcherReadFailure");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 10; i++) {
producer.newMessage()
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}

producer.flush();

Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);

// Inject failure in BK read
this.mockBookKeeper.failNow(BKException.Code.ReadException);

Set<String> receivedMsgs = new TreeSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
}

assertEquals(receivedMsgs.size(), 10);
for (int i = 0; i < 10; i++) {
assertTrue(receivedMsgs.contains("msg-" + i));
}
}

}

0 comments on commit 7d6dc2e

Please sign in to comment.