Skip to content

Commit

Permalink
[improve][test] Print topic stats and internal-stats if cannot receiv…
Browse files Browse the repository at this point in the history
…e message in 30s for Function Test (apache#16773)
  • Loading branch information
codelipenghui authored Jul 26, 2022
1 parent fb1df89 commit 0d8e5a7
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.swagger.util.Json;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1587,6 +1589,16 @@ private void publishAndConsumeMessages(String inputTopic,

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
if (msg == null) {
log.info("Input topic stats: {}",
Json.pretty(pulsarAdmin.topics().getStats(inputTopic, true)));
log.info("Output topic stats: {}",
Json.pretty(pulsarAdmin.topics().getStats(outputTopic, true)));
log.info("Input topic internal-stats: {}",
Json.pretty(pulsarAdmin.topics().getInternalStats(inputTopic, true)));
log.info("Output topic internal-stats: {}",
Json.pretty(pulsarAdmin.topics().getInternalStats(outputTopic, true)));
}
String logMsg = new String(msg.getValue(), UTF_8);
log.info("Received message: '{}'", logMsg);
assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected");
Expand Down

0 comments on commit 0d8e5a7

Please sign in to comment.