Skip to content

Commit

Permalink
Pulsar Client: allow to print GenericRecord contents (apache#9785)
Browse files Browse the repository at this point in the history
### Motivation
When you are dealing with structured data it would be pretty useful to have a tool that prints decoded records directly to the console.

### Modifications

Add two new options to the "pulsar-client consume" tool:
- "--schema-type" option, that can be "bytes" and "auto_consume" in order to select the Schema (in particular you are interested in AUTO_CONSUME if you are dealing with GenericRecords)
- "--hide-content" boolean option, that shuts down the printing of the contents of the messages, in case they are in binary form or that you are using the tool only to test if any message is coming from the topic

In case of GenericRecords we are printing the content of the record like a Java Map. We are also handling nested GenericRecord structures as well.
  • Loading branch information
eolivelli authored Mar 12, 2021
1 parent cb5a933 commit 3988f9f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -101,6 +94,9 @@ public class CmdConsume {
@Parameter(names = { "--hex" }, description = "Display binary messages in hex.")
private boolean displayHex = false;

@Parameter(names = { "--hide-content" }, description = "Do not write the message to console.")
private boolean hideContent = false;

@Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume, "
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;
Expand All @@ -122,6 +118,10 @@ public class CmdConsume {
"--encryption-key-value" }, description = "The URI of private key to decrypt payload, for example "
+ "file:///path/to/private.key or data:application/x-pem-file;base64,*****")
private String encKeyValue;

@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
private String schematype = "bytes";


private ClientBuilder clientBuilder;
private Authentication authentication;
Expand Down Expand Up @@ -150,19 +150,29 @@ public void updateConfig(ClientBuilder clientBuilder, Authentication authenticat
* Whether to display BytesMessages in hexdump style, ignored for simple text messages
* @return String representation of the message
*/
private String interpretMessage(Message<byte[]> message, boolean displayHex) throws IOException {
private String interpretMessage(Message<?> message, boolean displayHex) throws IOException {
StringBuilder sb = new StringBuilder();

String properties = Arrays.toString(message.getProperties().entrySet().toArray());

String data;
byte[] msgData = message.getData();
ByteArrayOutputStream out = new ByteArrayOutputStream();
if (!displayHex) {
data = new String(msgData);
Object value = message.getValue();
if (value == null) {
data = "null";
} else if (value instanceof byte[]) {
byte[] msgData = (byte[]) value;
ByteArrayOutputStream out = new ByteArrayOutputStream();
if (!displayHex) {
data = new String(msgData);
} else {
HexDump.dump(msgData, 0, out, 0);
data = new String(out.toByteArray());
}
} else if (value instanceof GenericRecord) {
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
data = asMap.toString();
} else {
HexDump.dump(msgData, 0, out, 0);
data = new String(out.toByteArray());
data = value.toString();
}

String key = null;
Expand All @@ -171,12 +181,29 @@ private String interpretMessage(Message<byte[]> message, boolean displayHex) thr
}

sb.append("key:[").append(key).append("], ");
sb.append("properties:").append(properties).append(", ");
if (!properties.isEmpty()) {
sb.append("properties:").append(properties).append(", ");
}
sb.append("content:").append(data);

return sb.toString();
}

private static Map<String, Object> genericRecordToMap(GenericRecord value) {
return value.getFields()
.stream()
.collect(Collectors.toMap(Field::getName, f -> {
Object fieldValue = value.getField(f);
if (fieldValue instanceof GenericRecord) {
return genericRecordToMap((GenericRecord) fieldValue);
} else if (fieldValue == null) {
return "null";
} else {
return fieldValue;
}
}));
}

/**
* Run the consume command.
*
Expand Down Expand Up @@ -204,8 +231,15 @@ private int consume(String topic) {
int returnCode = 0;

try {
ConsumerBuilder<?> builder;
PulsarClient client = clientBuilder.build();
ConsumerBuilder<byte[]> builder = client.newConsumer()
Schema<?> schema = Schema.BYTES;
if ("auto_consume".equals(schematype)) {
schema = Schema.AUTO_CONSUME();
} else if (!"bytes".equals(schematype)) {
throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume");
}
builder = client.newConsumer(schema)
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType)
.subscriptionMode(subscriptionMode)
Expand All @@ -230,22 +264,25 @@ private int consume(String topic) {
builder.defaultCryptoKeyReader(this.encKeyValue);
}

Consumer<byte[]> consumer = builder.subscribe();

Consumer<?> consumer = builder.subscribe();
RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
if (limiter != null) {
limiter.acquire();
}

Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
Message<?> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg == null) {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
numMessagesConsumed += 1;
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
}
consumer.acknowledge(msg);
}
}
Expand Down
3 changes: 2 additions & 1 deletion site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ Options
|`-q`, `--queue-size`|The size of consumer's receiver queue.|0|
|`-mc`, `--max_chunked_msg`|Max pending chunk messages.|0|
|`-ac`, `--auto_ack_chunk_q_full`|Auto ack for the oldest message in consumer's receiver queue if the queue full.|false|

|`--hide-content`|Do not print the message to the console.|false|
|`-st`, `--schema-type`|Set the schema type. Use `auto_consume` to dump AVRO and other structured data types. Possible values: bytes, auto_consume.|bytes|

## `pulsar-daemon`
A wrapper around the pulsar tool that’s used to start and stop processes, such as ZooKeeper, bookies, and Pulsar brokers, in the background using nohup.
Expand Down

0 comments on commit 3988f9f

Please sign in to comment.