Skip to content

Commit

Permalink
Avoid tracking the delays of all the message when we detect that they…
Browse files Browse the repository at this point in the history
… are fixed (apache#16609)

* Avoid tracking the delays of all the message when we detect that they are fixed

* Use tick time to avoid clock skews across different producers
  • Loading branch information
merlimat authored Jul 15, 2022
1 parent 807a283 commit c48a324
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);

/**
* Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
* more messages available.
*/
boolean shouldPauseAllDeliveries();

/**
* Reset tick time use zk policies cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T

private final boolean isDelayedDeliveryDeliverAtTimeStrict;

// If we detect that all messages have fixed delay time, such that the delivery is
// always going to be in FIFO order, then we can avoid pulling all the messages in
// tracker. Instead, we use the lookahead for detection and pause the read from
// the cursor if the delays are fixed.
public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;

// This is the timestamp of the message with the highest delivery time
// If new added messages are lower than this, it means the delivery is requested
// to be out-of-order. It gets reset to 0, once the tracker is emptied.
private long highestDeliveryTimeTracked = 0;

// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
Expand Down Expand Up @@ -86,16 +100,28 @@ private long getCutoffTime() {

@Override
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
messagesHaveFixedDelay = false;
return false;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
deliverAt - clock.millis());
}
if (deliverAt <= getCutoffTime()) {
return false;
}


priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();

// Check that new delivery time comes after the current highest, or at
// least within a single tick time interval of 1 second.
if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
messagesHaveFixedDelay = false;
}

highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt);

return true;
}

Expand Down Expand Up @@ -137,6 +163,13 @@ public Set<PositionImpl> getScheduledMessages(int maxMessages) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size());
}

if (priorityQueue.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
}

updateTimer();
return positions;
}
Expand Down Expand Up @@ -241,4 +274,12 @@ public void close() {
timeout.cancel();
}
}

@Override
public boolean shouldPauseAllDeliveries() {
// Pause deliveries if we know all delays are fixed within the lookahead window
return messagesHaveFixedDelay
&& priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
&& !hasMessageAvailable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
entry.release();
individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
} else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
}

public synchronized void readMoreEntries() {
if (shouldPauseDeliveryForDelayTracker()) {
return;
}

// totalAvailablePermits may be updated by other threads
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
Expand Down Expand Up @@ -874,13 +878,20 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata

synchronized (this) {
if (!delayedDeliveryTracker.isPresent()) {
if (!msgMetadata.hasDeliverAtTime()) {
// No need to initialize the tracker here
return false;
}

// Initialize the tracker the first time we need to use it
delayedDeliveryTracker = Optional
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());

long deliverAtTime = msgMetadata.hasDeliverAtTime() ? msgMetadata.getDeliverAtTime() : -1L;
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, deliverAtTime);
}
}

Expand All @@ -895,6 +906,10 @@ protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesT
}
}

protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
}

@Override
public synchronized long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,117 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> verify(dispatcher).readMoreEntries());
}

@Test
public void testWithFixedDelays() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);

AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);

assertFalse(tracker.hasMessageAvailable());

assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));

assertFalse(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

assertTrue(tracker.shouldPauseAllDeliveries());

clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);

tracker.getScheduledMessages(100);
assertFalse(tracker.shouldPauseAllDeliveries());

// Empty the tracker
int removed = 0;
do {
removed = tracker.getScheduledMessages(100).size();
} while (removed > 0);

assertFalse(tracker.shouldPauseAllDeliveries());
}

@Test
public void testWithMixedDelays() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);

AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);

assertFalse(tracker.hasMessageAvailable());

assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

assertTrue(tracker.shouldPauseAllDeliveries());

// Add message with earlier delivery time
assertTrue(tracker.addMessage(5, 5, 5));

assertFalse(tracker.shouldPauseAllDeliveries());
}

@Test
public void testWithNoDelays() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);

AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);

assertFalse(tracker.hasMessageAvailable());

assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(2, 2, 20));
assertTrue(tracker.addMessage(3, 3, 30));
assertTrue(tracker.addMessage(4, 4, 40));
assertTrue(tracker.addMessage(5, 5, 50));

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

assertTrue(tracker.shouldPauseAllDeliveries());

// Add message with no-delay
assertFalse(tracker.addMessage(5, 5, -1L));

assertFalse(tracker.shouldPauseAllDeliveries());
}

}

0 comments on commit c48a324

Please sign in to comment.