Skip to content

Commit

Permalink
Issue 8677: Cannot get lastMessageId for an empty topic due to messag…
Browse files Browse the repository at this point in the history
…e retention (apache#8725)

When we are trimming the ledgers we are saving the `currentLedger` but as soon as your restart the broker the currentLedger is not containing the lastMessageId (because it is a fresh new ledger).

Changes:
- add test case on pulsar-broker that reproduces the issue reported but the user
- log a message when we are trimming the ledger at lastAddConfirmedEntry
- add test case that prevent changes in the future on ManagedLedgerImpl
- fix a minor issue in PersistentTopic#getLastMessageId, a return keyword was missing and we continued with a call ti ManagedLedger, the CompletableFuture was already 'completed' so the final result is not changed (but we are saving resources)


Fixes apache#8677
  • Loading branch information
eolivelli authored Dec 7, 2020
1 parent 54f3e9c commit 5054642
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pulsar-functions/worker/src/test/resources/
*.iml
*.iws

# NetBeans
nb-configuration.xml

# Mac
**/.DS_Store

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2072,7 +2072,6 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}

// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
Expand Down Expand Up @@ -2123,8 +2122,14 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {

advanceNonDurableCursors(ledgersToDelete);

PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
ls.getLedgerId(), currentLastConfirmedEntry);
}
ledgerCache.remove(ls.getLedgerId());

ledgers.remove(ls.getLedgerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,33 @@ public void testRetention0WithEmptyLedger() throws Exception {
ml.close();
}

/**
* Set retention time = 0 and create a empty ledger,
* first position can't higher than last after trim ledgers.
* Even if we do not have subscriptions the ledger
* that contains the lastConfirmedEntry will be deleted anyway.
*/
@Test
public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(0, TimeUnit.MINUTES);
config.setMaxEntriesPerLedger(1);

ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
ml.addEntry("message1".getBytes());
ml.close();

// reopen ml
ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));

assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId);
assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId),
"the ledger at lastConfirmedEntry has not been trimmed!");
ml.close();
}

@Test
public void testInfiniteRetention() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2192,12 +2192,23 @@ public Position getLastPosition() {
public CompletableFuture<MessageId> getLastMessageId() {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
int partitionIndex = TopicName.getPartitionIndex(getName());
String name = getName();
int partitionIndex = TopicName.getPartitionIndex(name);
if (log.isDebugEnabled()) {
log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position);
}
if (position.getEntryId() == -1) {
completableFuture
.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
return completableFuture;
}
ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
if (!ledgerImpl.ledgerExists(position.getLedgerId())) {
completableFuture
.complete(MessageId.earliest);
return completableFuture;
}
((ManagedLedgerImpl) ledger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.broker.service;


import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand All @@ -30,8 +33,14 @@
import org.testng.Assert;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumedLedgersTrimTest extends BrokerTestBase {

private static final Logger LOG = LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);

@Override
protected void setup() throws Exception {
//No-op
Expand Down Expand Up @@ -90,4 +99,75 @@ public void TestConsumedLedgersTrim() throws Exception {
Thread.sleep(1500);
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
}


@Test
public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
conf.setRetentionCheckIntervalInSeconds(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.baseSetup();
final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";

// write some messages
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.producerName("producer-name")
.create();

// set retention parameters, the ledgers are to be deleted as soon as possible
// but the topic is not to be automatically deleted
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(1000);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
MessageId initialMessageId = persistentTopic.getLastMessageId().get();
LOG.info("lastmessageid " + initialMessageId);

int msgNum = 7;
for (int i = 0; i < msgNum; i++) {
producer.send(new byte[1024 * 1024]);
}

ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
MessageId messageIdBeforeRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
assertNotEquals(messageIdBeforeRestart, initialMessageId);

// restart the broker we have to start a new ledger
// the lastMessageId is still on the previous ledger
restartBroker();
// force load topic
pulsar.getAdminClient().topics().getStats(topicName);
MessageId messageIdAfterRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
LOG.info("lastmessageid " + messageIdAfterRestart);
assertEquals(messageIdAfterRestart, messageIdBeforeRestart);

persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(1);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// now we have two ledgers, the first is expired but is contains the lastMessageId
// the second is empty and should be kept as it is the current tail
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);

// force trimConsumedLedgers
Thread.sleep(3000);
CompletableFuture f = new CompletableFuture();
managedLedger.trimConsumedLedgersInBackground(f);
f.join();

// lastMessageId should be available even in this case, but is must
// refer to -1
MessageId messageIdAfterTrim = pulsar.getAdminClient().topics().getLastMessageId(topicName);
LOG.info("lastmessageid " + messageIdAfterTrim);
assertEquals(messageIdAfterTrim, MessageId.earliest);

}
}

0 comments on commit 5054642

Please sign in to comment.