Skip to content

Commit

Permalink
Allow to configure and disable the size of lookahead for detecting fi…
Browse files Browse the repository at this point in the history
…xed delays in messages (apache#17907)
  • Loading branch information
merlimat authored Oct 1, 2022
1 parent 8c1152c commit 5582674
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 21 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ delayedDeliveryMaxNumBuckets=50
# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false

# Size of the lookahead window to use when detecting if all the messages in the topic
# have a fixed delay.
# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle
# fixed delays in messages in a different way.
delayedDeliveryFixedDelayDetectionLookahead=50000

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ "when detecting if all the messages in the topic have a fixed delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will disable the "
+ "logic to handle fixed delays in messages in a different way.")
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// always going to be in FIFO order, then we can avoid pulling all the messages in
// tracker. Instead, we use the lookahead for detection and pause the read from
// the cursor if the delays are fixed.
public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
private final long fixedDelayDetectionLookahead;

// This is the timestamp of the message with the highest delivery time
// If new added messages are lower than this, it means the delivery is requested
Expand All @@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
private boolean messagesHaveFixedDelay = true;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
}

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}

/**
Expand Down Expand Up @@ -283,8 +288,9 @@ public void close() {
@Override
public boolean shouldPauseAllDeliveries() {
// Pause deliveries if we know all delays are fixed within the lookahead window
return messagesHaveFixedDelay
&& priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
return fixedDelayDetectionLookahead > 0
&& messagesHaveFixedDelay
&& priorityQueue.size() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra

private boolean isDelayedDeliveryDeliverAtTimeStrict;

private long fixedDelayDetectionLookahead;

@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict);
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void test() throws Exception {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false);
false, 0);

assertFalse(tracker.hasMessageAvailable());

Expand Down Expand Up @@ -146,7 +146,7 @@ public void testWithTimer() throws Exception {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false);
false, 0);

assertTrue(tasks.isEmpty());
assertTrue(tracker.addMessage(2, 2, 20));
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testAddWithinTickTime() {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false);
false, 0);

clockTime.set(0);

Expand All @@ -209,7 +209,7 @@ public void testAddMessageWithStrictDelay() {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true);
true, 0);

clockTime.set(10);

Expand All @@ -236,7 +236,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithSt
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
1000, clock, true);
1000, clock, true, 0);

// Set clock time, then run tracker to inherit clock time as the last tick time.
clockTime.set(10000);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStr
// a previous tick run.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
100000, clock, true);
100000, clock, true, 0);

clockTime.set(500000);

Expand All @@ -299,7 +299,7 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
500, clock, true);
500, clock, true, 0);

clockTime.set(0);

Expand All @@ -323,9 +323,11 @@ public void testWithFixedDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

final long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -339,13 +341,13 @@ public void testWithFixedDelays() throws Exception {
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

assertTrue(tracker.shouldPauseAllDeliveries());

clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);
clockTime.set(fixedDelayLookahead * 10);

tracker.getScheduledMessages(100);
assertFalse(tracker.shouldPauseAllDeliveries());
Expand All @@ -367,9 +369,11 @@ public void testWithMixedDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -381,7 +385,7 @@ public void testWithMixedDelays() throws Exception {

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

Expand All @@ -401,9 +405,11 @@ public void testWithNoDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -415,7 +421,7 @@ public void testWithNoDelays() throws Exception {

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

Expand Down

0 comments on commit 5582674

Please sign in to comment.