From 55826742d1c589d106d7cbe97f12ec2e8bcca35f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sat, 1 Oct 2022 08:06:13 -0700 Subject: [PATCH] Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907) --- conf/broker.conf | 6 ++++ .../pulsar/broker/ServiceConfiguration.java | 6 ++++ .../InMemoryDelayedDeliveryTracker.java | 18 ++++++---- ...InMemoryDelayedDeliveryTrackerFactory.java | 5 ++- .../delayed/InMemoryDeliveryTrackerTest.java | 34 +++++++++++-------- 5 files changed, 48 insertions(+), 21 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index d117d679c8532..e6b3aef8811b5 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -576,6 +576,12 @@ delayedDeliveryMaxNumBuckets=50 # Enable share the delayed message index across subscriptions delayedDeliverySharedIndexEnabled=false +# Size of the lookahead window to use when detecting if all the messages in the topic +# have a fixed delay. +# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle +# fixed delays in messages in a different way. +delayedDeliveryFixedDelayDetectionLookahead=50000 + # Whether to enable acknowledge of batch local index. acknowledgmentAtBatchIndexLevelEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8c883045e66c5..6683d36c36e06 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -372,6 +372,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se @FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions") private boolean delayedDeliverySharedIndexEnabled = false; + @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use " + + "when detecting if all the messages in the topic have a fixed delay. " + + "Default is 50,000. Setting the lookahead window to 0 will disable the " + + "logic to handle fixed delays in messages in a different way.") + private long delayedDeliveryFixedDelayDetectionLookahead = 50_000; + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") private boolean acknowledgmentAtBatchIndexLevelEnabled = false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 83b113df36b6e..11d663322be52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T // always going to be in FIFO order, then we can avoid pulling all the messages in // tracker. Instead, we use the lookahead for detection and pause the read from // the cursor if the delays are fixed. - public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000; + private final long fixedDelayDetectionLookahead; // This is the timestamp of the message with the highest delivery time // If new added messages are lower than this, it means the delivery is requested @@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T private boolean messagesHaveFixedDelay = true; InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, - boolean isDelayedDeliveryDeliverAtTimeStrict) { - this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict); + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead); } InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, - long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) { + long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { this.dispatcher = dispatcher; this.timer = timer; this.tickTimeMillis = tickTimeMillis; this.clock = clock; this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; } /** @@ -283,8 +288,9 @@ public void close() { @Override public boolean shouldPauseAllDeliveries() { // Pause deliveries if we know all delays are fixed within the lookahead window - return messagesHaveFixedDelay - && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES + return fixedDelayDetectionLookahead > 0 + && messagesHaveFixedDelay + && priorityQueue.size() >= fixedDelayDetectionLookahead && !hasMessageAvailable(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index 5c04a6d53b257..7bf0ca87c40c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra private boolean isDelayedDeliveryDeliverAtTimeStrict; + private long fixedDelayDetectionLookahead; + @Override public void initialize(ServiceConfiguration config) { this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS); this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis(); this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict(); + this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead(); } @Override public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, - isDelayedDeliveryDeliverAtTimeStrict); + isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index db2db6cc1dbb0..1ff47a4ca5065 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -74,7 +74,7 @@ public void test() throws Exception { @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - false); + false, 0); assertFalse(tracker.hasMessageAvailable()); @@ -146,7 +146,7 @@ public void testWithTimer() throws Exception { @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - false); + false, 0); assertTrue(tasks.isEmpty()); assertTrue(tracker.addMessage(2, 2, 20)); @@ -187,7 +187,7 @@ public void testAddWithinTickTime() { @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, - false); + false, 0); clockTime.set(0); @@ -209,7 +209,7 @@ public void testAddMessageWithStrictDelay() { @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, - true); + true, 0); clockTime.set(10); @@ -236,7 +236,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithSt // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 1000, clock, true); + 1000, clock, true, 0); // Set clock time, then run tracker to inherit clock time as the last tick time. clockTime.set(10000); @@ -274,7 +274,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStr // a previous tick run. @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 100000, clock, true); + 100000, clock, true, 0); clockTime.set(500000); @@ -299,7 +299,7 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 500, clock, true); + 500, clock, true, 0); clockTime.set(0); @@ -323,9 +323,11 @@ public void testWithFixedDelays() throws Exception { Clock clock = mock(Clock.class); when(clock.millis()).then(x -> clockTime.get()); + final long fixedDelayLookahead = 100; + @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + true, fixedDelayLookahead); assertFalse(tracker.hasMessageAvailable()); @@ -339,13 +341,13 @@ public void testWithFixedDelays() throws Exception { assertEquals(tracker.getNumberOfDelayedMessages(), 5); assertFalse(tracker.shouldPauseAllDeliveries()); - for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + for (int i = 6; i <= fixedDelayLookahead; i++) { assertTrue(tracker.addMessage(i, i, i * 10)); } assertTrue(tracker.shouldPauseAllDeliveries()); - clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10); + clockTime.set(fixedDelayLookahead * 10); tracker.getScheduledMessages(100); assertFalse(tracker.shouldPauseAllDeliveries()); @@ -367,9 +369,11 @@ public void testWithMixedDelays() throws Exception { Clock clock = mock(Clock.class); when(clock.millis()).then(x -> clockTime.get()); + long fixedDelayLookahead = 100; + @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + true, fixedDelayLookahead); assertFalse(tracker.hasMessageAvailable()); @@ -381,7 +385,7 @@ public void testWithMixedDelays() throws Exception { assertFalse(tracker.shouldPauseAllDeliveries()); - for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + for (int i = 6; i <= fixedDelayLookahead; i++) { assertTrue(tracker.addMessage(i, i, i * 10)); } @@ -401,9 +405,11 @@ public void testWithNoDelays() throws Exception { Clock clock = mock(Clock.class); when(clock.millis()).then(x -> clockTime.get()); + long fixedDelayLookahead = 100; + @Cleanup InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + true, fixedDelayLookahead); assertFalse(tracker.hasMessageAvailable()); @@ -415,7 +421,7 @@ public void testWithNoDelays() throws Exception { assertFalse(tracker.shouldPauseAllDeliveries()); - for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) { + for (int i = 6; i <= fixedDelayLookahead; i++) { assertTrue(tracker.addMessage(i, i, i * 10)); }