Skip to content

Commit

Permalink
[fix][broker][monitoring] fix message ack rate (apache#16108)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjiuming authored Jun 21, 2022
1 parent face8bb commit 8869d8c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public void doUnsubscribe(final long requestId) {
}

public CompletableFuture<Void> messageAcked(CommandAck ack) {
CompletableFuture<Void> future;
CompletableFuture<Long> future;

this.lastAckedTimestamp = System.currentTimeMillis();
Map<String, Long> properties = Collections.emptyMap();
Expand Down Expand Up @@ -401,11 +401,12 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
List<PositionImpl> positionsAcked = Collections.singletonList(position);
future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked);
ack.getTxnidLeastBits(), positionsAcked)
.thenApply(unused -> 1L);
} else {
List<Position> positionsAcked = Collections.singletonList(position);
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
future = CompletableFuture.completedFuture(null);
future = CompletableFuture.completedFuture(1L);
}
} else {
if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
Expand All @@ -416,16 +417,16 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
}

return future
.whenComplete((__, t) -> {
if (t == null) {
this.messageAckRate.recordEvent(ack.getMessageIdsCount());
}
.thenApply(v -> {
this.messageAckRate.recordEvent(v);
return null;
});
}

//this method is for individual ack not carry the transaction
private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
Expand Down Expand Up @@ -458,10 +459,12 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,
checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
completableFuture.complete(null);
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalAckCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
//check if the position can remove from the consumer pending acks.
Expand All @@ -479,14 +482,15 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,


//this method is for individual ack carry the transaction
private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
if (!isTransactionEnabled()) {
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
}

LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
Expand Down Expand Up @@ -515,6 +519,8 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount.add(ackedCount);
}

CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
Expand All @@ -530,7 +536,7 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
}
}));
}
return completableFuture;
return completableFuture.thenApply(__ -> totalAckCount.sum());
}

private long getBatchSize(MessageIdData msgId) {
Expand Down Expand Up @@ -753,9 +759,9 @@ public void updateRates() {
messageAckRate.calculateRate();

stats.msgRateOut = msgOut.getRate();
stats.messageAckRate = messageAckRate.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
stats.messageAckRate = messageAckRate.getValueRate();
stats.chunkedMessageRate = chunkedMessageRate.getRate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand Down Expand Up @@ -249,56 +250,94 @@ public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Excep

private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
throws Exception {
final int messages = 100;
final int messages = 1000;
String subName = "test_sub";
CountDownLatch latch = new CountDownLatch(messages);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.enableBatching(true).batchingMaxMessages(10).create();

MessageListener<String> listener = (consumer, msg) -> {
try {
consumer.acknowledge(msg);
latch.countDown();
} catch (PulsarClientException e) {
//ignore
}
};
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.messageListener(listener)
.subscribe();
@Cleanup
Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.messageListener(listener)
.subscribe();

String namespace = TopicName.get(topicName).getNamespace();

for (int i = 0; i < messages; i++) {
producer.send(UUID.randomUUID().toString());
producer.sendAsync(UUID.randomUUID().toString());
}
producer.flush();

for (int i = 0; i < messages; i++) {
Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
if (message == null) {
break;
}

consumer.acknowledge(message);
}
latch.await(20, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(1);

Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
Subscription subscription = topic.getSubscription(subName);
List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
Assert.assertEquals(consumers.size(), 1);
Assert.assertEquals(consumers.size(), 2);
org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
org.apache.pulsar.broker.service.Consumer consumer2 = consumers.get(1);
consumer1.updateRates();
consumer2.updateRates();

ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
String metricStr = output.toString(StandardCharsets.UTF_8);

Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
Assert.assertTrue(metrics.size() > 0);

int num = 0;
for (PrometheusMetricsTest.Metric metric : metrics) {
if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
num++;
Assert.assertTrue(metric.value > 0);
} else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
num++;
Assert.assertTrue(metric.value > 0);
}
Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");

String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
Assert.assertTrue(ackRateMetric.size() > 0);
Assert.assertTrue(rateOutMetric.size() > 0);

if (exposeTopicLevelMetrics) {
String consumer1Name = consumer1.consumerName();
String consumer2Name = consumer2.consumerName();
double totalAckRate = ackRateMetric.stream()
.filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
|| metric.tags.get("consumer_name").equals(consumer2Name))
.mapToDouble(metric -> metric.value).sum();
double totalRateOut = rateOutMetric.stream()
.filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
|| metric.tags.get("consumer_name").equals(consumer2Name))
.mapToDouble(metric -> metric.value).sum();

Assert.assertTrue(totalAckRate > 0D);
Assert.assertTrue(totalRateOut > 0D);
Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
} else {
double totalAckRate = ackRateMetric.stream()
.filter(metric -> namespace.equals(metric.tags.get("namespace")))
.mapToDouble(metric -> metric.value).sum();
double totalRateOut = rateOutMetric.stream()
.filter(metric -> namespace.equals(metric.tags.get("namespace")))
.mapToDouble(metric -> metric.value).sum();

Assert.assertTrue(totalAckRate > 0D);
Assert.assertTrue(totalRateOut > 0D);
Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
}

Assert.assertTrue(num > 0);
}
}

0 comments on commit 8869d8c

Please sign in to comment.