Skip to content

Commit

Permalink
Issue 16802: fix Repeated messages of shared dispatcher (apache#16812)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jul 28, 2022
1 parent 89b6a53 commit 825b68d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;

protected boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -240,6 +240,11 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
}

public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
return;
}
Expand Down Expand Up @@ -496,7 +501,7 @@ public SubType getType() {
}

@Override
public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
havePendingRead = false;
Expand Down Expand Up @@ -528,27 +533,47 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
} else {
sendMessagesToConsumers(readType, entries);
}
}

protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
boolean readMoreEntries;
try {
readMoreEntries = trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
if (readMoreEntries) {
readMoreEntries();
}
}

/**
* Dispatch the messages to the Consumers.
* @return true if you want to trigger a new read.
* This method is overridden by other classes, please take a look to other implementations
* if you need to change it.
*/
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}

int entriesToDispatch = entries.size();
// Trigger read more messages
if (entriesToDispatch == 0) {
readMoreEntries();
return;
return true;
}
final MessageMetadata[] metadataArray = entries.stream()
.map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
Expand Down Expand Up @@ -578,7 +603,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entr
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
return;
return false;
}

// round-robin dispatch batch size for this consumer
Expand Down Expand Up @@ -623,7 +648,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entr
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
if (log.isDebugEnabled()){
if (log.isDebugEnabled()) {
log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
Expand Down Expand Up @@ -658,7 +683,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entr
entry.release();
});
}
readMoreEntries();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,21 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
};

@Override
protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int entriesCount = entries.size();

// Trigger read more messages
if (entriesCount == 0) {
readMoreEntries();
return;
return true;
}

if (consumerSet.isEmpty()) {
entries.forEach(Entry::release);
cursor.rewind();
return;
return false;
}

// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
Expand Down Expand Up @@ -201,8 +200,7 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entr
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
readMoreEntries();
return;
return true;
}
}
}
Expand Down Expand Up @@ -331,14 +329,17 @@ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entr
}
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(() -> readMoreEntries());
} else if (currentThreadKeyNumber == 0) {
sendInProgress = false;
topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
}, 100, TimeUnit.MILLISECONDS);
}
return false;
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
for (int i = 0; i < N; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
consumer.acknowledge(msg);
}

assertEquals(receivedMsgs.size(), N);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.channel.EventLoopGroup;
Expand All @@ -69,6 +70,7 @@
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -99,6 +101,7 @@ public void setup() throws Exception {
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();

pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
Expand All @@ -115,7 +118,7 @@ public void setup() throws Exception {
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
doReturn(eventLoopGroup).when(brokerMock).executor();
doAnswer(invocation -> {
((Runnable)invocation.getArguments()[0]).run();
orderedExecutor.execute(((Runnable)invocation.getArguments()[0]));
return null;
}).when(eventLoopGroup).execute(any(Runnable.class));

Expand Down Expand Up @@ -180,19 +183,21 @@ public void testSendMarkerMessage() {
fail("Failed to readEntriesComplete.", e);
}

ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
verify(consumerMock, times(1)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
totalMessagesCaptor.capture(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);

List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
Awaitility.await().untilAsserted(() -> {
ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
verify(consumerMock, times(1)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
totalMessagesCaptor.capture(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);

List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
});
}

@Test(timeOut = 10000)
Expand Down Expand Up @@ -283,21 +288,23 @@ public void testSkipRedeliverTemporally() {
// and then stop to dispatch to slowConsumer
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);

verify(consumerMock, times(1)).sendMessages(
argThat(arg -> {
assertEquals(arg.size(), 1);
Entry entry = arg.get(0);
assertEquals(entry.getLedgerId(), 1);
assertEquals(entry.getEntryId(), 3);
return true;
}),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
Awaitility.await().untilAsserted(() -> {
verify(consumerMock, times(1)).sendMessages(
argThat(arg -> {
assertEquals(arg.size(), 1);
Entry entry = arg.get(0);
assertEquals(entry.getLedgerId(), 1);
assertEquals(entry.getEntryId(), 3);
return true;
}),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
});
verify(slowConsumerMock, times(0)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
Expand Down Expand Up @@ -399,9 +406,15 @@ public void testMessageRedelivery() throws Exception {
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean());

// Mock Cursor#asyncReadEntriesOrWait
AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean();
doAnswer(invocationOnMock -> {
((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
.readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) {
((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
.readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
} else {
((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
.readEntriesComplete(Collections.emptyList(), PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
}
return null;
}).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
any(PersistentStickyKeyDispatcherMultipleConsumers.class),
Expand Down

0 comments on commit 825b68d

Please sign in to comment.