diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 71faeb7adba7d..cf58bfd43ac5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; - + protected boolean sendInProgress; protected static final AtomicIntegerFieldUpdater TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, @@ -240,6 +240,11 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional } public synchronized void readMoreEntries() { + if (sendInProgress) { + // we cannot read more entries while sending the previous batch + // otherwise we could re-read the same entries and send duplicates + return; + } if (shouldPauseDeliveryForDelayTracker()) { return; } @@ -496,7 +501,7 @@ public SubType getType() { } @Override - public synchronized void readEntriesComplete(List entries, Object ctx) { + public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; if (readType == ReadType.Normal) { havePendingRead = false; @@ -528,18 +533,39 @@ public synchronized void readEntriesComplete(List entries, Object ctx) { log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } + // dispatch messages to a separate thread, but still in order for this subscription + // sendMessagesToConsumers is responsible for running broker-side filters + // that may be quite expensive if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) { - // dispatch messages to a separate thread, but still in order for this subscription - // sendMessagesToConsumers is responsible for running broker-side filters - // that may be quite expensive + // setting sendInProgress here, because sendMessagesToConsumers will be executed + // in a separate thread, and we want to prevent more reads + sendInProgress = true; dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries))); } else { sendMessagesToConsumers(readType, entries); } } - protected synchronized void sendMessagesToConsumers(ReadType readType, List entries) { + protected final synchronized void sendMessagesToConsumers(ReadType readType, List entries) { + sendInProgress = true; + boolean readMoreEntries; + try { + readMoreEntries = trySendMessagesToConsumers(readType, entries); + } finally { + sendInProgress = false; + } + if (readMoreEntries) { + readMoreEntries(); + } + } + /** + * Dispatch the messages to the Consumers. + * @return true if you want to trigger a new read. + * This method is overridden by other classes, please take a look to other implementations + * if you need to change it. + */ + protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } @@ -547,8 +573,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1)) @@ -578,7 +603,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List> initialValue() throws Exception { }; @Override - protected synchronized void sendMessagesToConsumers(ReadType readType, List entries) { + protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; @@ -160,14 +160,13 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List readMoreEntries()); } else if (currentThreadKeyNumber == 0) { + sendInProgress = false; topic.getBrokerService().executor().schedule(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { readMoreEntries(); } }, 100, TimeUnit.MILLISECONDS); } + return false; } private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index aa78790732967..8b62845572d26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -278,7 +278,6 @@ public void testDelayedDeliveryWithMultipleConcurrentReadEntries() for (int i = 0; i < N; i++) { msg = consumer.receive(10, TimeUnit.SECONDS); receivedMsgs.add(msg.getValue()); - consumer.acknowledge(msg); } assertEquals(receivedMsgs.size(), N); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 72286b01c7628..aa87b2aaa2584 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -46,6 +46,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import io.netty.channel.EventLoopGroup; @@ -69,6 +70,7 @@ import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -99,6 +101,7 @@ public void setup() throws Exception { doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); + doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -115,7 +118,7 @@ public void setup() throws Exception { EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); doReturn(eventLoopGroup).when(brokerMock).executor(); doAnswer(invocation -> { - ((Runnable)invocation.getArguments()[0]).run(); + orderedExecutor.execute(((Runnable)invocation.getArguments()[0])); return null; }).when(eventLoopGroup).execute(any(Runnable.class)); @@ -180,19 +183,21 @@ public void testSendMarkerMessage() { fail("Failed to readEntriesComplete.", e); } - ArgumentCaptor totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class); - verify(consumerMock, times(1)).sendMessages( - anyList(), - any(EntryBatchSizes.class), - any(EntryBatchIndexesAcks.class), - totalMessagesCaptor.capture(), - anyLong(), - anyLong(), - any(RedeliveryTracker.class) - ); - - List allTotalMessagesCaptor = totalMessagesCaptor.getAllValues(); - Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); + Awaitility.await().untilAsserted(() -> { + ArgumentCaptor totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class); + verify(consumerMock, times(1)).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + totalMessagesCaptor.capture(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + + List allTotalMessagesCaptor = totalMessagesCaptor.getAllValues(); + Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); + }); } @Test(timeOut = 10000) @@ -283,21 +288,23 @@ public void testSkipRedeliverTemporally() { // and then stop to dispatch to slowConsumer persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries); - verify(consumerMock, times(1)).sendMessages( - argThat(arg -> { - assertEquals(arg.size(), 1); - Entry entry = arg.get(0); - assertEquals(entry.getLedgerId(), 1); - assertEquals(entry.getEntryId(), 3); - return true; - }), - any(EntryBatchSizes.class), - any(EntryBatchIndexesAcks.class), - anyInt(), - anyLong(), - anyLong(), - any(RedeliveryTracker.class) - ); + Awaitility.await().untilAsserted(() -> { + verify(consumerMock, times(1)).sendMessages( + argThat(arg -> { + assertEquals(arg.size(), 1); + Entry entry = arg.get(0); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), 3); + return true; + }), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + }); verify(slowConsumerMock, times(0)).sendMessages( anyList(), any(EntryBatchSizes.class), @@ -399,9 +406,15 @@ public void testMessageRedelivery() throws Exception { eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); // Mock Cursor#asyncReadEntriesOrWait + AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean(); doAnswer(invocationOnMock -> { - ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) - .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + } else { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(Collections.emptyList(), PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + } return null; }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class),