Skip to content

Commit

Permalink
[fix][test]: fix flaky test of ManagedCursorMetricsTest.testManagedCu…
Browse files Browse the repository at this point in the history
…rsorMetrics (apache#9919) (apache#14720)

Fixes apache#9919

### Motivation

we need make sure broker executed all ack command and updated metrics, then we can generate and check metric

### Modifications

- enable AckReceipt
- await until ack procedure complete(ACK and ACK_RESPONSE command)

Co-authored-by: xuanqi.wu <[email protected]>
  • Loading branch information
wuxuanqicn and weimob-wuxuanqi authored Apr 28, 2022
1 parent 615f05a commit be7057a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@
*/
package org.apache.pulsar.broker.stats;

import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Test(groups = "broker")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {

Expand All @@ -49,6 +54,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
return PulsarTestClient.create(clientBuilder);
}

@Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
Expand All @@ -63,14 +73,18 @@ public void testManagedCursorMetrics() throws Exception {
metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

Consumer<byte[]> consumer = pulsarClient.newConsumer()
PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.isAckReceiptEnabled(true)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
.create();

Expand All @@ -83,6 +97,8 @@ public void testManagedCursorMetrics() throws Exception {
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
}

Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
private State state;

@Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
.expectedItems(16)
Expand Down

0 comments on commit be7057a

Please sign in to comment.