diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 2fbd9a51d4ab4..35853d3599b0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -55,6 +55,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable { */ Set getScheduledMessages(int maxMessages); + /** + * Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has + * more messages available. + */ + boolean shouldPauseAllDeliveries(); /** * Reset tick time use zk policies cache. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 92df563dad46e..837d3d1872c3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -55,6 +55,20 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T private final boolean isDelayedDeliveryDeliverAtTimeStrict; + // If we detect that all messages have fixed delay time, such that the delivery is + // always going to be in FIFO order, then we can avoid pulling all the messages in + // tracker. Instead, we use the lookahead for detection and pause the read from + // the cursor if the delays are fixed. + public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000; + + // This is the timestamp of the message with the highest delivery time + // If new added messages are lower than this, it means the delivery is requested + // to be out-of-order. It gets reset to 0, once the tracker is emptied. + private long highestDeliveryTimeTracked = 0; + + // Track whether we have seen all messages with fixed delay so far. + private boolean messagesHaveFixedDelay = true; + InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict) { this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict); @@ -86,16 +100,28 @@ private long getCutoffTime() { @Override public boolean addMessage(long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= getCutoffTime()) { + messagesHaveFixedDelay = false; + return false; + } + if (log.isDebugEnabled()) { log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, deliverAt - clock.millis()); } - if (deliverAt <= getCutoffTime()) { - return false; - } + priorityQueue.add(deliverAt, ledgerId, entryId); updateTimer(); + + // Check that new delivery time comes after the current highest, or at + // least within a single tick time interval of 1 second. + if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + + highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); + return true; } @@ -137,6 +163,13 @@ public Set getScheduledMessages(int maxMessages) { if (log.isDebugEnabled()) { log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size()); } + + if (priorityQueue.isEmpty()) { + // Reset to initial state + highestDeliveryTimeTracked = 0; + messagesHaveFixedDelay = true; + } + updateTimer(); return positions; } @@ -241,4 +274,12 @@ public void close() { timeout.cancel(); } } + + @Override + public boolean shouldPauseAllDeliveries() { + // Pause deliveries if we know all delays are fixed within the lookahead window + return messagesHaveFixedDelay + && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES + && !hasMessageAvailable(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 0c7a664121634..d9f36bf0643e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -177,8 +177,7 @@ public int filterEntriesForConsumer(Optional optMetadataArray entry.release(); individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap()); continue; - } else if (msgMetadata.hasDeliverAtTime() - && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { + } else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { // The message is marked for delayed delivery. Ignore for now. entries.set(i, null); entry.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 6af58557a83bf..a770e76fc4368 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -236,6 +236,10 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional } public synchronized void readMoreEntries() { + if (shouldPauseDeliveryForDelayTracker()) { + return; + } + // totalAvailablePermits may be updated by other threads int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); @@ -874,13 +878,20 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata synchronized (this) { if (!delayedDeliveryTracker.isPresent()) { + if (!msgMetadata.hasDeliverAtTime()) { + // No need to initialize the tracker here + return false; + } + // Initialize the tracker the first time we need to use it delayedDeliveryTracker = Optional .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); } delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); - return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); + + long deliverAtTime = msgMetadata.hasDeliverAtTime() ? msgMetadata.getDeliverAtTime() : -1L; + return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, deliverAtTime); } } @@ -895,6 +906,10 @@ protected synchronized Set getMessagesToReplayNow(int maxMessagesT } } + protected synchronized boolean shouldPauseDeliveryForDelayTracker() { + return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); + } + @Override public synchronized long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index f44f61a67f9ca..db2db6cc1dbb0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -314,4 +314,117 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> verify(dispatcher).readMoreEntries()); } + + @Test + public void testWithFixedDelays() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true); + + assertFalse(tracker.hasMessageAvailable()); + + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(2, 2, 20)); + assertTrue(tracker.addMessage(3, 3, 30)); + assertTrue(tracker.addMessage(4, 4, 40)); + assertTrue(tracker.addMessage(5, 5, 50)); + + assertFalse(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 5); + assertFalse(tracker.shouldPauseAllDeliveries()); + + for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + assertTrue(tracker.addMessage(i, i, i * 10)); + } + + assertTrue(tracker.shouldPauseAllDeliveries()); + + clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10); + + tracker.getScheduledMessages(100); + assertFalse(tracker.shouldPauseAllDeliveries()); + + // Empty the tracker + int removed = 0; + do { + removed = tracker.getScheduledMessages(100).size(); + } while (removed > 0); + + assertFalse(tracker.shouldPauseAllDeliveries()); + } + + @Test + public void testWithMixedDelays() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true); + + assertFalse(tracker.hasMessageAvailable()); + + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(2, 2, 20)); + assertTrue(tracker.addMessage(3, 3, 30)); + assertTrue(tracker.addMessage(4, 4, 40)); + assertTrue(tracker.addMessage(5, 5, 50)); + + assertFalse(tracker.shouldPauseAllDeliveries()); + + for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + assertTrue(tracker.addMessage(i, i, i * 10)); + } + + assertTrue(tracker.shouldPauseAllDeliveries()); + + // Add message with earlier delivery time + assertTrue(tracker.addMessage(5, 5, 5)); + + assertFalse(tracker.shouldPauseAllDeliveries()); + } + + @Test + public void testWithNoDelays() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true); + + assertFalse(tracker.hasMessageAvailable()); + + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(2, 2, 20)); + assertTrue(tracker.addMessage(3, 3, 30)); + assertTrue(tracker.addMessage(4, 4, 40)); + assertTrue(tracker.addMessage(5, 5, 50)); + + assertFalse(tracker.shouldPauseAllDeliveries()); + + for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + assertTrue(tracker.addMessage(i, i, i * 10)); + } + + assertTrue(tracker.shouldPauseAllDeliveries()); + + // Add message with no-delay + assertFalse(tracker.addMessage(5, 5, -1L)); + + assertFalse(tracker.shouldPauseAllDeliveries()); + } + }