Skip to content

Commit

Permalink
Fix dead loop in BacklogQuotaManager.dropBacklogForTimeLimit (apache#…
Browse files Browse the repository at this point in the history
…13194) (apache#13249)

Fixes apache#13194 

### Motivation
https://github.com/apache/pulsar/blob/38fb839154462fc5c6b0b4293f02762ed4021cd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L200-L219
BacklogQuotaManager.dropBacklogForTimeLimit may fall into dead loop in some conditions, e.g.
`backlogQuotaDefaultLimitSecond` is enabled
1. producer stop produce after produced some messages, current ledger is A
2. times up, triggered ledger rollover, a new ledger B created which is empty (no entries)
3. now lastConfirmedEntry is `A:last-entry-id`
4. after `backlogQuotaDefaultLimitSecond` times up, it'll reset cursor to position `A:last-entry-id+1` which is only valid, so loop begin until the producer resume produce

### Modifications

Record the previous slowestReaderPosition, if it is same with newer slowestReaderPosition after `resetCursor`, then exit loop.
  • Loading branch information
Shawyeok authored Dec 23, 2021
1 parent bfb5782 commit 021409b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -201,17 +202,22 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
try {
Long ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(ledgerId).get();
// Timestamp only > 0 if ledger has been closed
while (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
for (;;) {
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
// skip whole ledger for the slowest cursor
slowestConsumer.resetCursor(mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)));
ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
ledgerInfo = mLedger.getLedgerInfo(ledgerId).get();
Position oldestPosition = slowestConsumer.getMarkDeletedPosition();
ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition = mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1));
if (!nextPosition.equals(oldestPosition)) {
slowestConsumer.resetCursor(nextPosition);
continue;
}
}
break;
}
} catch (Exception e) {
log.error("[{}] Error resetting cursor for slowest consumer [{}]", persistentTopic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,54 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
client.close();
}

@Test
public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
BacklogQuota.builder()
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build(), BacklogQuota.BacklogQuotaType.message_age);
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topic = "persistent://prop/ns-quota/topic4";
final String subName = "c1";

Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subName).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic);
producer.send(new byte[1024]);
consumer.receive();

admin.topics().unload(topic);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic);
assertEquals(internalStats.ledgers.size(), 2);
assertEquals(internalStats.ledgers.get(1).entries, 0);

TopicStats stats = admin.topics().getStats(topic);
assertEquals(stats.getSubscriptions().get(subName).getMsgBacklog(), 1);

TimeUnit.SECONDS.sleep(TIME_TO_CHECK_BACKLOG_QUOTA);

Awaitility.await()
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA))
.untilAsserted(() -> {
rolloverStats();

// Cause the last ledger is empty, it is not possible to skip first ledger,
// so the number of ledgers will keep unchanged, and backlog is clear
PersistentTopicInternalStats latestInternalStats = admin.topics().getInternalStats(topic);
assertEquals(latestInternalStats.ledgers.size(), 2);
assertEquals(latestInternalStats.ledgers.get(1).entries, 0);
TopicStats latestStats = admin.topics().getStats(topic);
assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0);
});

client.close();
}

@Test
public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Expand Down

0 comments on commit 021409b

Please sign in to comment.