Skip to content

Commit

Permalink
[fix][broker] Fix creating producer failure when set backlog quota. (a…
Browse files Browse the repository at this point in the history
…pache#15663)

### Motivation

When trying to reproduce the problem of apache#15609 using the master's code, it was found that the master also had this bug. The root cause is:
When there is only one ledger in the ManagedLedger, after the current ledger is closed, it has the timestamp and exceeds the time set by the backlog-qutoa, resulting in the failure to create the producer.

The added test could reproduce this. 

So when there is only one ledger, we should not exclude it.

### Verifying this change

If revert this patch, the added test will fail.
  • Loading branch information
Technoboy- authored May 25, 2022
1 parent b7e1510 commit 3a80458
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2551,29 +2551,46 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}, null);
return future;
} else {
Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
PositionImpl slowestPosition = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition();
try {
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
&& ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
> backlogQuotaLimitInSecond * 1000) {
if (log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+ "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
}
return CompletableFuture.completedFuture(true);
} else {
return CompletableFuture.completedFuture(false);
}
return slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return CompletableFuture.completedFuture(false);
}
}
}

private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
Long ledgerId = slowestPosition.getLedgerId();
if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
return CompletableFuture.completedFuture(false);
}
int result;
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
&& ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
> backlogQuotaLimitInSecond * 1000 && (result = slowestPosition.compareTo(
new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) <= 0) {
if (result < 0) {
if (log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+ "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
}
return CompletableFuture.completedFuture(true);
} else {
return slowestReaderTimeBasedBacklogQuotaCheck(
((ManagedLedgerImpl) ledger).getNextValidPosition(slowestPosition));
}
} else {
return CompletableFuture.completedFuture(false);
}
}

@Override
public boolean isReplicated() {
return !replicators.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -44,13 +48,15 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -66,6 +72,8 @@ protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(false);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setDefaultNumPartitions(PARTITIONS);
conf.setManagedLedgerMaxEntriesPerLedger(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);

super.baseSetup();
}
Expand Down Expand Up @@ -171,4 +179,60 @@ public void testHealthCheckTopicNotOffload() throws Exception {
});
}

@Test
private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";

admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
BacklogQuota quota = BacklogQuota.builder()
.limitTime(2)
.limitSize(-1)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(ns, quota, BacklogQuota.BacklogQuotaType.message_age);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

String partition0 = TopicName.get(String.format("persistent://%s", topic)).getPartition(0).toString();
Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(partition0);
Assert.assertTrue(topicReference.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
persistentTopic.getManagedLedger().setConfig(config);
Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
String msg1 = "msg-1";
producer.send(msg1);
Thread.sleep(3 * 1000);

Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub-1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();

Message<String> receive = consumer2.receive();
consumer2.acknowledge(receive);

Thread.sleep(3 * 1000);

try {
Producer<String> producerN = PulsarClient.builder()
.maxBackoffInterval(3, TimeUnit.SECONDS)
.operationTimeout(5, TimeUnit.SECONDS)
.serviceUrl(lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build()
.newProducer(Schema.STRING).topic(topic).sendTimeout(3, TimeUnit.SECONDS).create();
Assert.assertTrue(producerN.isConnected());
producerN.close();
} catch (Exception ex) {
Assert.fail("failed to create producer");
}
}
}

0 comments on commit 3a80458

Please sign in to comment.