Skip to content

Commit

Permalink
[fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Aug 3, 2022
1 parent e8fd71e commit a8231a4
Showing 1 changed file with 116 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
*/
package org.apache.pulsar.broker.stats;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -34,23 +40,24 @@
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {

@BeforeMethod(alwaysRun = true)
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
super.internalSetup();
}

@AfterMethod(alwaysRun = true)
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand All @@ -61,21 +68,28 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws
return PulsarTestClient.create(clientBuilder);
}

/***
* This method has overridden these case:
* brk_ml_cursor_persistLedgerSucceed
* brk_ml_cursor_persistLedgerErrors
* brk_ml_cursor_persistZookeeperSucceed
* brk_ml_cursor_nonContiguousDeletedMessagesRange
* But not overridden "brk_ml_cursor_nonContiguousDeletedMessagesRange".
*/
@Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
final int messageSize = 10;

/** Before create cursor. Verify metrics will not be generated. **/
// Create ManagedCursorMetrics and verify empty.
ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
/**
* Trigger the cursor ledger creation.
* After create cursor. Verify all metrics is zero.
*/
// Trigger cursor creation.
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
.topic(topicName)
Expand All @@ -84,35 +98,103 @@ public void testManagedCursorMetrics() throws Exception {
.subscriptionName(subName)
.isAckReceiptEnabled(true)
.subscribe();


@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();

producer.send("trigger-cursor-ledger-creation".getBytes());
// Trigger the cursor ledger creation
consumer.acknowledge(consumer.receive().getMessageId());

for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
final PersistentSubscription persistentSubscription =
(PersistentSubscription) pulsar.getBrokerService()
.getTopic(topicName, false).get().get().getSubscription(subName);
final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats();
// Assert.
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
/**
* 1. Send many messages, and only ack half. After the cursor data is written to BK,
* verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange".
* 2. Ack another half, verify "brk_ml_cursor_nonContiguousDeletedMessagesRange" is zero.
*/
// Send many message and ack half.
List<MessageId> keepsMessageIdList = new ArrayList<>();
for (int i = 0; i < 30; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
if (i < 10 || i > 20) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
keepsMessageIdList.add(consumer.receive().getMessageId());
}
}

Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
// Wait persistent.
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.until(() -> managedCursorMXBean.getPersistLedgerSucceed() > 0);
// Assert.
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
0L);
// Ack another half.
for (MessageId messageId : keepsMessageIdList){
consumer.acknowledge(messageId);
}
// Wait persistent.
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.until(() -> managedCursor.getTotalNonContiguousDeletedMessagesRange() == 0);
// Assert.
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
/**
* Make BK error, and send many message, then wait cursor persistent finish.
* After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and
* "brk_ml_cursor_persistZookeeperSucceed".
*/
// Send amd ack messages, at the same time makes BK error.
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
String message = UUID.randomUUID().toString();
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
// Make BK error.
LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
ledgerHandle.close();
return managedCursorMXBean.getPersistLedgerErrors() > 0
&& managedCursorMXBean.getPersistZookeeperSucceed() > 0;
});
// Assert.
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
/**
* TODO verify "brk_ml_cursor_persistZookeeperErrors".
* This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether
* the request is triggered by the "create new ledger then write ZK" or the "persistent cursor then write ZK".
* The cursor path is "/managed-ledgers/my-namespace/use/my-ns/persistent/my-topic1/my-sub". Maybe we can
* mock/spy ManagedCursorImpl to overridden this case in another PR.
*/
mockZooKeeper.unsetAlwaysFail();
producer.close();
consumer.close();
managedCursor.close();
admin.topics().delete(topicName, true);
}

@Test
Expand Down Expand Up @@ -150,10 +232,6 @@ public void testCursorReadWriteMetrics() throws Exception {
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
Expand All @@ -172,5 +250,11 @@ public void testCursorReadWriteMetrics() throws Exception {
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

// cleanup.
consumer.close();
consumer2.close();
producer.close();
admin.topics().delete(topicName, true);
}
}

0 comments on commit a8231a4

Please sign in to comment.