Skip to content

Commit

Permalink
[fix][cli] Check numMessages after incrementing counter (apache#17826)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andras Beni authored Sep 29, 2022
1 parent 9026d19 commit df5e0e1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,18 @@ public static void main(String[] args) throws Exception {
PerfClientUtils.exit(0);
}
}
if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
log.info("------------- DONE (reached the maximum number: [{}] of consumption) --------------",
arguments.numMessages);
PerfClientUtils.exit(0);
}
messagesReceived.increment();
bytesReceived.add(msg.getData().length);

totalMessagesReceived.increment();
totalBytesReceived.add(msg.getData().length);

if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
log.info("------------- DONE (reached the maximum number: [{}] of consumption) --------------",
arguments.numMessages);
PerfClientUtils.exit(0);
}

if (limiter != null) {
limiter.acquire();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,24 @@
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class PerfToolTest extends TopicMessagingBase {

private static final int MESSAGE_COUNT = 50;

@Test
private void testProduce() throws Exception {
public void testProduce() throws Exception {
String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testProduce", true);
// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName);
ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT);
checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated throughput stats",
"PerformanceProducer - Aggregated latency stats");
}

@Test
private void testConsume() throws Exception {
public void testConsume() throws Exception {
String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testConsume", true);
// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
Expand All @@ -57,16 +55,36 @@ private void testConsume() throws Exception {
"PerformanceConsumer - Aggregated latency stats");
}

private ContainerExecResult produceWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
@Test
public void testRead() throws Exception {
String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testRead", true);
// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
ContainerExecResult readResult = readWithPerfTool(clientToolContainer, serviceUrl, topicName);
checkOutputForLogs(readResult,"PerformanceReader - Aggregated throughput stats ",
"PerformanceReader - Aggregated latency stats");
}

private ContainerExecResult produceWithPerfTool(ChaosContainer<?> container, String url, String topic, int messageCount) throws Exception {
ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(messageCount), topic);

return failOnError("Performance producer", result);
}

private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
CompletableFuture<ContainerExecResult> resultFuture =
container.execCmdAsync("bin/pulsar-perf", "consume", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
produceWithPerfTool(container, url, topic);
produceWithPerfTool(container, url, topic, MESSAGE_COUNT);

ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
return failOnError("Performance consumer", result);
}

private ContainerExecResult readWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
CompletableFuture<ContainerExecResult> resultFuture =
container.execCmdAsync("bin/pulsar-perf", "read", "-u", url, "-n", String.valueOf(MESSAGE_COUNT), topic);
produceWithPerfTool(container, url, topic, MESSAGE_COUNT);

ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
return failOnError("Performance consumer", result);
Expand Down

0 comments on commit df5e0e1

Please sign in to comment.