Skip to content

Commit

Permalink
[improve][broker] Reschedule reads with increasing backoff when no me…
Browse files Browse the repository at this point in the history
…ssages are dispatched (apache#23226)
  • Loading branch information
lhotari authored Aug 29, 2024
1 parent 59424a8 commit d98e51f
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 23 deletions.
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,20 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the initial backoff delay in milliseconds.")
private int dispatcherRetryBackoffInitialTimeInMs = 100;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the maximum backoff delay in milliseconds.")
private int dispatcherRetryBackoffMaxTimeInMs = 1000;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -85,7 +86,6 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range<Position> lastIndividualDeletedRangeFromCursorRecovery;
Expand Down Expand Up @@ -134,7 +134,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;

protected int lastNumberOfEntriesDispatched;
private final Backoff retryBackoff;
protected enum ReadType {
Normal, Replay
}
Expand All @@ -159,10 +160,15 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
retryBackoff = new Backoff(
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS,
serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS,
0, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -437,16 +443,20 @@ private boolean shouldPauseOnAckStatePersist(ReadType readType) {

@Override
protected void reScheduleRead() {
reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
}

protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
readAfterMs, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -659,8 +669,8 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

long size = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(size);
long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(totalBytesSize);

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
Expand All @@ -670,19 +680,28 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
});
} else {
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
readMoreEntriesAsync();
} else {
updatePendingBytesToDispatch(-size);
handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize);
}
}

private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List<Entry> entries,
boolean needAcquireSendInProgress,
long totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
updatePendingBytesToDispatch(-totalBytesSize);
if (triggerReadingMore) {
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
retryBackoff.reset();
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
// If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
reScheduleReadInMs(retryBackoff.next());
}
}
}
Expand Down Expand Up @@ -721,6 +740,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
lastNumberOfEntriesDispatched = 0;

int entriesToDispatch = entries.size();
// Trigger read more messages
Expand Down Expand Up @@ -828,6 +848,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});

lastNumberOfEntriesDispatched = entriesToDispatch;
}
return true;
}
Expand Down Expand Up @@ -890,6 +912,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ protected Map<Consumer, List<Position>> initialValue() throws Exception {

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
Expand Down Expand Up @@ -420,6 +421,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

lastNumberOfEntriesDispatched = (int) totalEntries;

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ protected void doInitConf() throws Exception {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
// Disable the dispatcher retry backoff in tests by default
this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
}

protected final void init() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
Expand All @@ -53,17 +54,18 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -107,6 +109,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {

final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
private AtomicInteger consumerMockAvailablePermits;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -117,7 +120,8 @@ public void setup() throws Exception {
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();

doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

Expand Down Expand Up @@ -188,7 +192,8 @@ public void setup() throws Exception {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
doReturn(1000).when(consumerMock).getAvailablePermits();
consumerMockAvailablePermits = new AtomicInteger(1000);
doAnswer(invocation -> consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
anyList(),
Expand Down Expand Up @@ -511,8 +516,6 @@ public void testMessageRedelivery() throws Exception {
allEntries.forEach(entry -> entry.release());
}



@DataProvider(name = "initializeLastSentPosition")
private Object[][] initialLastSentPositionProvider() {
return new Object[][] { { false }, { true } };
Expand Down Expand Up @@ -822,6 +825,77 @@ public void testLastSentPositionAndIndividuallySentPositions(final boolean initi
assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString());
}

@DataProvider(name = "dispatchMessagesInSubscriptionThread")
private Object[][] dispatchMessagesInSubscriptionThread() {
return new Object[][] { { false }, { true } };
}

@Test(dataProvider = "dispatchMessagesInSubscriptionThread")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread)
throws Exception {
persistentDispatcher.close();

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
persistentDispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
assertEquals(delay, 20.0, 2.0, "Second retry delay should be 20ms (jitter <-10%)");
}
);
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
double delay = retryDelays.get(101);
assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms (jitter <-10%)");
}
);
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
}
);
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ protected void startBroker() throws Exception {
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
// Disable the dispatcher retry backoff in tests by default
conf.setDispatcherRetryBackoffInitialTimeInMs(0);
conf.setDispatcherRetryBackoffMaxTimeInMs(0);
serviceConfigurationList.add(conf);

PulsarTestContext.Builder testContextBuilder =
Expand Down

0 comments on commit d98e51f

Please sign in to comment.