Skip to content

Commit

Permalink
Efficiency improvements for delay delivery tracker (apache#5498)
Browse files Browse the repository at this point in the history
Efficiency improvements for delay delivery tracker
  • Loading branch information
merlimat authored and wolfstudy committed Oct 30, 2019
1 parent 418362e commit 467ffab
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
deliveryAt - now);
}
if (deliveryAt < now) {
// It's already about time to deliver this message
if (deliveryAt < (now + tickTimeMillis)) {
// It's already about time to deliver this message. We add the buffer of
// `tickTimeMillis` because messages can be extracted from the tracker
// slightly before the expiration time. We don't want the messages to
// go back into the delay tracker (for a brief amount of time) when we're
// trying to dispatch to the consumer.
return false;
}

Expand Down Expand Up @@ -117,7 +121,7 @@ public Set<PositionImpl> getScheduledMessages(int maxMessages) {
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messags - found {}", dispatcher.getName(), positions.size());
log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size());
}
updateTimer();
return positions;
Expand Down Expand Up @@ -170,7 +174,7 @@ private void updateTimer() {
@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.info("[{}] Timer triggered", dispatcher.getName());
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout.isCancelled()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.collect.Range;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -715,37 +716,36 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
}

@Override
public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!isDelayedDeliveryEnabled) {
// If broker has the feature disabled, always deliver messages immediately
return false;
}

if (!delayedDeliveryTracker.isPresent()) {
// Initialize the tracker the first time we need to use it
delayedDeliveryTracker = Optional.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}
synchronized (this) {
if (!delayedDeliveryTracker.isPresent()) {
// Initialize the tracker the first time we need to use it
delayedDeliveryTracker = Optional
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
}
}

private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!messagesToRedeliver.isEmpty()) {
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else if (delayedDeliveryTracker.isPresent()) {
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
}
}

public synchronized long getNumberOfDelayedMessages() {
if (delayedDeliveryTracker.isPresent()) {
return delayedDeliveryTracker.get().getNumberOfDelayedMessages();
} else {
return 0;
}
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,32 @@ public void testWithTimer() throws Exception {
task.run(mock(Timeout.class));
verify(dispatcher).readMoreEntries();
}

/**
* Adding a message that is about to expire within the tick time should lead
* to a rejection from the tracker.
*/
@Test
public void testAddWithinTickTime() throws Exception {
PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);

Timer timer = mock(Timer.class);

AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);

clockTime.set(0);

assertFalse(tracker.addMessage(1, 1, 10));
assertFalse(tracker.addMessage(2, 2, 99));
assertTrue(tracker.addMessage(3, 3, 100));
assertTrue(tracker.addMessage(4, 4, 200));

assertEquals(tracker.getNumberOfDelayedMessages(), 2);
}

}

0 comments on commit 467ffab

Please sign in to comment.