Skip to content

Commit

Permalink
[Transaction] Add a check for uninitialized PendingAck (apache#13088)
Browse files Browse the repository at this point in the history
### Motivation

We shoud not generate the statistics of a uninitialized PendingAck,and we should check if it is initialized when we get it by `getStoreManageLedger()`.
### Modifications
 Shoud not generate the statistics of a uninitialized PendingAck
 Add check if it is initialized when we get it by `getStoreManageLedger()`.
  • Loading branch information
liangyepianzhou authored Dec 6, 2021
1 parent e01ff0f commit 591b4e8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1231,5 +1231,9 @@ public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
}
}

public boolean checkIfPendingAckStoreInit() {
return this.pendingAckHandle.checkIfPendingAckStoreInit();
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream,
topic.getSubscriptions().values().forEach(subscription -> {
try {
localManageLedgerStats.get().reset();
if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
&& subscription instanceof PersistentSubscription
&& ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
ManagedLedger managedLedger =
((PersistentSubscription) subscription)
.getPendingAckManageLedger().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,9 @@ CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties,
*/
CompletableFuture<Void> close();

/**
* Check if the PendingAckStore is init.
* @return if the PendingAckStore is init.
*/
boolean checkIfPendingAckStoreInit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ public TransactionPendingAckStats getStats() {
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}

@Override
public boolean checkIfPendingAckStoreInit() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ public CompletableFuture<Void> close() {
}

public CompletableFuture<ManagedLedger> getStoreManageLedger() {
if (this.pendingAckStoreFuture.isDone()) {
if (this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone()) {
return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
if (pendingAckStore instanceof MLPendingAckStore) {
return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
Expand All @@ -937,6 +937,11 @@ public CompletableFuture<ManagedLedger> getStoreManageLedger() {
}
}

@Override
public boolean checkIfPendingAckStoreInit() {
return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
}

protected void handleCacheRequest() {
while (true) {
Runnable runnable = acceptQueue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -271,6 +272,73 @@ public void testManagedLedgerMetrics() throws Exception{
assertEquals(metric.size(), 2);
}

@Test
public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{
String ns1 = "prop/ns-abc1";
admin.namespaces().createNamespace(ns1);
String topic = "persistent://" + ns1 + "/testManagedLedgerMetricsWhenPendingAckNotInit";
String subName = "test_managed_ledger_metrics";
String subName2 = "test_pending_ack_no_init";
admin.topics().createNonPartitionedTopic(topic);
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
admin.topics().createSubscription(topic, subName, MessageId.earliest);
admin.topics().createSubscription(topic, subName2, MessageId.earliest);

Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size() == 1);

pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.receiverQueueSize(10)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
producer.send("hello pulsar".getBytes());
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();

Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);

Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
checkManagedLedgerMetrics(subName, 32, metric);
//No statistics of the pendingAck are generated when the pendingAck is not initialized.
for (PrometheusMetricsTest.Metric metric1 : metric) {
if (metric1.tags.containsValue(subName2)) {
Assert.fail();
}
}

consumer = pulsarClient.newConsumer()
.topic(topic)
.receiverQueueSize(10)
.subscriptionName(subName2)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
transaction =
pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();

statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
metric = metrics.get("pulsar_storage_size");
checkManagedLedgerMetrics(subName2, 32, metric);
}

private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
boolean exist = false;
for (PrometheusMetricsTest.Metric metric1 : metrics) {
Expand Down

0 comments on commit 591b4e8

Please sign in to comment.