Skip to content

Commit

Permalink
Expose the receiver queue size to client consumer stats (apache#8663)
Browse files Browse the repository at this point in the history
Fixes apache#8650
### Motivation
Currently, we log the receiver queue size. But we don't expose the receiver queue size to the client consumer stats. We should expose it.

### Modifications
add API for `ConsumerStats.java`
  • Loading branch information
315157973 authored Nov 22, 2020
1 parent 334dffd commit 642461c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.google.common.reflect.Reflection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

Expand Down Expand Up @@ -72,19 +71,16 @@

import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
Expand All @@ -100,6 +96,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -3647,4 +3644,55 @@ public void testGetLastDisconnectedTimestampForPartitionedTopic() throws Excepti
Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
}

@Test
public void testGetStats() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetStats" + UUID.randomUUID();
final String subName = "my-sub";
final int receiveQueueSize = 100;
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
Assert.assertNull(consumer.getStats().getMsgNumInSubReceiverQueue());
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 0);

for (int i = 0; i < receiveQueueSize; i++) {
producer.sendAsync("msg" + i);
}
//Give some time to consume
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), receiveQueueSize));
consumer.close();
producer.close();
}

@Test
public void testGetStatsForPartitionedTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetStatsForPartitionedTopic";
final String subName = "my-sub";
final int receiveQueueSize = 100;

admin.topics().createPartitionedTopic(topicName, 3);
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
MultiTopicsConsumerImpl<String> consumer = (MultiTopicsConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
Assert.assertEquals(consumer.getStats().getMsgNumInSubReceiverQueue().size(), 3);
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 0);

consumer.getStats().getMsgNumInSubReceiverQueue()
.forEach((key, value) -> Assert.assertEquals((int) value, 0));

for (int i = 0; i < receiveQueueSize; i++) {
producer.sendAsync("msg" + i);
}
//Give some time to consume
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), receiveQueueSize));
consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.Map;

import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down Expand Up @@ -101,4 +103,16 @@ public interface ConsumerStats extends Serializable {
* @return Total number of message acknowledgments failures on this consumer
*/
long getTotalAcksFailed();

/**
* Get the size of receiver queue.
* @return
*/
Integer getMsgNumInReceiverQueue();

/**
* Get the receiver queue size of sub-consumers.
* @return
*/
Map<Long, Integer> getMsgNumInSubReceiverQueue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.util.Map;
import java.util.Optional;

import org.apache.pulsar.client.api.ConsumerStats;
Expand Down Expand Up @@ -115,6 +116,16 @@ public long getTotalAcksFailed() {
return 0;
}

@Override
public Integer getMsgNumInReceiverQueue() {
return null;
}

@Override
public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
return null;
}

@Override
public double getRateMsgsReceived() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand All @@ -42,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
private ConsumerImpl<?> consumer;
private Consumer<?> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
Expand All @@ -65,6 +69,11 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");

public ConsumerStatsRecorderImpl() {
this(null);
}

public ConsumerStatsRecorderImpl(Consumer<?> consumer) {
this.consumer = consumer;
numMsgsReceived = new LongAdder();
numBytesReceived = new LongAdder();
numReceiveFailed = new LongAdder();
Expand All @@ -80,7 +89,7 @@ public ConsumerStatsRecorderImpl() {
}

public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf,
ConsumerImpl<?> consumer) {
Consumer<?> consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
Expand Down Expand Up @@ -112,9 +121,10 @@ private void init(ConsumerConfigurationData<?> conf) {
}

stat = (timeout) -> {
if (timeout.isCancelled()) {
if (timeout.isCancelled() || !(consumer instanceof ConsumerImpl)) {
return;
}
ConsumerImpl<?> consumerImpl = (ConsumerImpl<?>) consumer;
try {
long now = System.nanoTime();
double elapsed = (now - oldTime) / 1e9;
Expand All @@ -135,23 +145,22 @@ private void init(ConsumerConfigurationData<?> conf) {

receivedMsgsRate = currentNumMsgsReceived / elapsed;
receivedBytesRate = currentNumBytesReceived / elapsed;

if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent
| currentNumAcksFailed) != 0) {
log.info(
"[{}] [{}] [{}] Prefetched messages: {} --- "
+ "Consume throughput received: {} msgs/s --- {} Mbit/s --- "
+ "Ack sent rate: {} ack/s --- " + "Failed messages: {} --- batch messages: {} ---"
+ "Failed acks: {}",
consumer.getTopic(), consumer.getSubscription(), consumer.consumerName,
consumer.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate),
consumerImpl.getTopic(), consumerImpl.getSubscription(), consumerImpl.consumerName,
consumerImpl.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate),
THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 1024 / 1024),
THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
currentNumBatchReceiveFailed, currentNumAcksFailed);
}
} catch (Exception e) {
log.error("[{}] [{}] [{}]: {}", consumer.getTopic(), consumer.subscription, consumer.consumerName,
e.getMessage());
log.error("[{}] [{}] [{}]: {}", consumerImpl.getTopic(), consumerImpl.subscription
, consumerImpl.consumerName, e.getMessage());
} finally {
// schedule the next stat info
statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
Expand Down Expand Up @@ -230,22 +239,47 @@ public void updateCumulativeStats(ConsumerStats stats) {
totalAcksFailed.add(stats.getTotalAcksFailed());
}

@Override
public Integer getMsgNumInReceiverQueue() {
if (consumer instanceof ConsumerBase) {
return ((ConsumerBase<?>) consumer).incomingMessages.size();
}
return null;
}

@Override
public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
if (consumer instanceof MultiTopicsConsumerImpl) {
List<ConsumerImpl<?>> consumerList = ((MultiTopicsConsumerImpl) consumer).getConsumers();
return consumerList.stream().collect(
Collectors.toMap((consumerImpl) -> consumerImpl.consumerId
, (consumerImpl) -> consumerImpl.incomingMessages.size())
);
}
return null;
}

@Override
public long getNumMsgsReceived() {
return numMsgsReceived.longValue();
}

@Override
public long getNumBytesReceived() {
return numBytesReceived.longValue();
}

@Override
public long getNumAcksSent() {
return numAcksSent.longValue();
}

@Override
public long getNumAcksFailed() {
return numAcksFailed.longValue();
}

@Override
public long getNumReceiveFailed() {
return numReceiveFailed.longValue();
}
Expand All @@ -255,14 +289,17 @@ public long getNumBatchReceiveFailed() {
return numBatchReceiveFailed.longValue();
}

@Override
public long getTotalMsgsReceived() {
return totalMsgsReceived.longValue();
}

@Override
public long getTotalBytesReceived() {
return totalBytesReceived.longValue();
}

@Override
public long getTotalReceivedFailed() {
return totalReceiveFailed.longValue();
}
Expand All @@ -272,10 +309,12 @@ public long getTotaBatchReceivedFailed() {
return totalBatchReceiveFailed.longValue();
}

@Override
public long getTotalAcksSent() {
return totalAcksSent.longValue();
}

@Override
public long getTotalAcksFailed() {
return totalAcksFailed.longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}

this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl(this) : null;

// start track and auto subscribe partition increasement
if (conf.isAutoUpdatePartitions()) {
Expand Down

0 comments on commit 642461c

Please sign in to comment.