Skip to content

Commit

Permalink
[improve] [client] Add api to get producer/consumer stats for partiti…
Browse files Browse the repository at this point in the history
…on topic (apache#18212)

* [improve] [client] Add api to get producer/consumer stats for partition topic

* introduce partition topic stats interface
  • Loading branch information
rdhabalia authored Nov 1, 2022
1 parent 927a00e commit 26f9ffa
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -30,6 +31,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -449,4 +451,63 @@ public void testProducerPendingQueueSizeStats(boolean batchingEnabled) throws Ex
.until(() -> producer.getStats().getPendingQueueSize() == numMessages);
assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
}

/**
* This test verifies partitioned topic stats for producer and consumer.
* @throws Exception
*/
@Test
public void testPartitionTopicStats() throws Exception {
log.info("-- Starting {} test --", methodName);

String topicName = "persistent://my-property/my-ns/testPartitionTopicStats";
int numPartitions = 10;
admin.topics().createPartitionedTopic(topicName, numPartitions);

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name");

Consumer<byte[]> consumer = consumerBuilder.subscribe();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().enableBatching(false).topic(topicName);

Producer<byte[]> producer = producerBuilder.create();

int numMessages = 20;
for (int i = 0; i < numMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);

MultiTopicConsumerStats cStat = (MultiTopicConsumerStats) consumer.getStats();
PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats) producer.getStats();
retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5, 100);
retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5, 100);
Map<String, ProducerStats> prodStatsMap = pStat.getPartitionStats();
Map<String, ConsumerStats> consStatsMap = cStat.getPartitionStats();
assertFalse(prodStatsMap.isEmpty());
assertFalse(consStatsMap.isEmpty());
for (int i = 0; i < numPartitions; i++) {
String topic = topicName + "-partition-" + i;
assertTrue(prodStatsMap.containsKey(topic));
assertTrue(consStatsMap.containsKey(topic));
}

consumer.close();
producer.close();

log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.Map;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* Multi-topic Consumer statistics recorded by client.
*
* <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MultiTopicConsumerStats extends ConsumerStats {

/**
* @return stats for each partition if topic is partitioned topic
*/
Map<String, ConsumerStats> getPartitionStats();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.Map;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* Partitioned topic Producer statistics recorded by client.
*
* <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface PartitionedTopicProducerStats extends ProducerStats {

/**
* @return stats for each partition if topic is partitioned topic
*/
Map<String, ProducerStats> getPartitionStats();

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,4 @@ public interface ProducerStats extends Serializable {
* @return current pending send-message queue size of the producer
*/
int getPendingQueueSize();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.MultiTopicConsumerStats;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTopicConsumerStatsRecorderImpl extends ConsumerStatsRecorderImpl implements MultiTopicConsumerStats {

private static final long serialVersionUID = 1L;
private Map<String, ConsumerStats> partitionStats = new ConcurrentHashMap<>();

public MultiTopicConsumerStatsRecorderImpl() {
super();
}

public MultiTopicConsumerStatsRecorderImpl(Consumer<?> consumer) {
super(consumer);
}

public MultiTopicConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf,
Consumer<?> consumer) {
super(pulsarClient, conf, consumer);
}

public void updateCumulativeStats(String partition, ConsumerStats stats) {
super.updateCumulativeStats(stats);
partitionStats.put(partition, stats);
}

@Override
public Map<String, ConsumerStats> getPartitionStats() {
return partitionStats;
}

private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
private final ConsumerStatsRecorder stats;
private final MultiTopicConsumerStatsRecorderImpl stats;
private UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;

Expand Down Expand Up @@ -156,7 +156,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0
? new ConsumerStatsRecorderImpl(this)
? new MultiTopicConsumerStatsRecorderImpl(this)
: null;

// start track and auto subscribe partition increment
Expand Down Expand Up @@ -826,7 +826,7 @@ public synchronized ConsumerStats getStats() {
}
stats.reset();

consumers.values().stream().forEach(consumer -> stats.updateCumulativeStats(consumer.getStats()));
consumers.forEach((partition, consumer) -> stats.updateCumulativeStats(partition, consumer.getStats()));
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {

private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
private final MessageRouter routerPolicy;
private final ProducerStatsRecorderImpl stats;
private final PartitionedTopicProducerStatsRecorderImpl stats;
private TopicMetadata topicMetadata;
private final int firstPartitionIndex;
private String overrideProducerName;
Expand All @@ -80,7 +80,9 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
stats = client.getConfiguration().getStatsIntervalSeconds() > 0
? new PartitionedTopicProducerStatsRecorderImpl()
: null;

// MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
Expand Down Expand Up @@ -353,7 +355,8 @@ public synchronized ProducerStatsRecorderImpl getStats() {
return null;
}
stats.reset();
producers.values().forEach(p -> stats.updateCumulativeStats(p.getStats()));
producers.forEach(
(partition, producer) -> stats.updateCumulativeStats(producer.getTopic(), producer.getStats()));
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.DoubleAdder;
import org.apache.pulsar.client.api.PartitionedTopicProducerStats;
import org.apache.pulsar.client.api.ProducerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsRecorderImpl
implements PartitionedTopicProducerStats {

private static final long serialVersionUID = 1L;
private Map<String, ProducerStats> partitionStats = Collections.emptyMap();
private final DoubleAdder sendMsgsRateAggregate;
private final DoubleAdder sendBytesRateAggregate;
private int partitions = 0;

public PartitionedTopicProducerStatsRecorderImpl() {
super();
partitionStats = new ConcurrentHashMap<>();
sendMsgsRateAggregate = new DoubleAdder();
sendBytesRateAggregate = new DoubleAdder();
}

void reset() {
super.reset();
partitions = 0;
}

void updateCumulativeStats(String partition, ProducerStats stats) {
super.updateCumulativeStats(stats);
if (stats == null) {
return;
}
partitionStats.put(partition, stats);
// update rates
sendMsgsRateAggregate.add(stats.getSendMsgsRate());
sendBytesRateAggregate.add(stats.getSendBytesRate());
partitions++;
}

@Override
public double getSendMsgsRate() {
return sendMsgsRateAggregate.doubleValue() / partitions;
}

@Override
public double getSendBytesRate() {
return sendBytesRateAggregate.doubleValue() / partitions;
}

@Override
public Map<String, ProducerStats> getPartitionStats() {
return partitionStats;
}

private static final Logger log = LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,5 @@ public double getSendLatencyMillisMax() {
public int getPendingQueueSize() {
return 0;
}

}
Loading

0 comments on commit 26f9ffa

Please sign in to comment.