Skip to content

Commit

Permalink
Add option to support message key and message properties (apache#5960)
Browse files Browse the repository at this point in the history
### Motivation

This change allows a user of the pulsar-client to produce message with key and properties, and print these in the consume output.
  • Loading branch information
ltamber authored and sijie committed Jan 8, 2020
1 parent 3d81028 commit 4f177e6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testInitialzation() throws MalformedURLException, InterruptedExcepti
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);

String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", topicName };
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName };
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);

future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -125,14 +126,30 @@ public void updateConfig(ClientBuilder clientBuilder, Authentication authenticat
* @return String representation of the message
*/
private String interpretMessage(Message<byte[]> message, boolean displayHex) throws IOException {
StringBuilder sb = new StringBuilder();

String properties = Arrays.toString(message.getProperties().entrySet().toArray());

String data;
byte[] msgData = message.getData();
ByteArrayOutputStream out = new ByteArrayOutputStream();
if (!displayHex) {
return new String(msgData);
data = new String(msgData);
} else {
HexDump.dump(msgData, 0, out, 0);
return new String(out.toByteArray());
data = new String(out.toByteArray());
}

String key = null;
if (message.hasKey()) {
key = message.getKey();
}

sb.append("key:[").append(key).append("], ");
sb.append("properties:").append(properties).append(", ");
sb.append("content:").append(data);

return sb.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
Expand Down Expand Up @@ -97,6 +99,14 @@ public class CmdProduce {
description = "Character to split messages string on default is comma")
private String separator = ",";

@Parameter(names = { "-p", "--properties"}, description = "Properties to add, Comma separated "
+ "key=value string, like k1=v1,k2=v2.")
private List<String> properties = Lists.newArrayList();

@Parameter(names = { "-k", "--key"}, description = "message key to add ")
private String key;


private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
Expand Down Expand Up @@ -191,13 +201,31 @@ private int publish(String topic) {

List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;

Map<String, String> kvMap = new HashMap<>();
for (String property : properties) {
String [] kv = property.split("=");
kvMap.put(kv[0], kv[1]);
}

for (int i = 0; i < this.numTimesProduce; i++) {
for (byte[] content : messageBodies) {
if (limiter != null) {
limiter.acquire();
}

producer.send(content);
TypedMessageBuilder<byte[]> message = producer.newMessage();

if (!kvMap.isEmpty()) {
message.properties(kvMap);
}

if (key != null && !key.isEmpty()) {
message.key(key);
}

message.value(content).send();

numMessagesSent++;
}
}
Expand Down

0 comments on commit 4f177e6

Please sign in to comment.