Skip to content

Commit

Permalink
[pulsar-client] Fix: set and return topic name on message api (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Aug 22, 2021
1 parent c0ef593 commit 0f8aef2
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,46 @@ public void testAutoUpdatePartitionsForProducerConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testCustomPartitionedProducer() throws Exception {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
TopicName topicName = null;
Producer<byte[]> producer = null;
try {
log.info("-- Starting {} test --", methodName);

int numPartitions = 4;
topicName = TopicName
.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());

admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);

RouterWithTopicName router = new RouterWithTopicName();
producer = pulsarClient.newProducer().topic(topicName.toString())
.messageRouter(router)
.create();
for (int i = 0; i < 1; i++) {
String message = "my-message-" + i;
producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
}
assertEquals(router.topicName, topicName.toString());
} finally {
producer.close();
pulsarClient.close();
admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
}

private static class RouterWithTopicName implements MessageRouter {
static String topicName = null;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
topicName = msg.getTopicName();
return 2;
}
}

private static class AlwaysTwoMessageRouter implements MessageRouter {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientExceptio
for (int i = 0; i < messages / 2; i++) {
MessageMetadata metadata = new MessageMetadata()
.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages/2);
Expand Down Expand Up @@ -147,15 +147,15 @@ public void testEnsureNotBlockOnThePendingQueue() throws Exception {
MessageMetadata metadata = new MessageMetadata()
.setNumMessagesInBatch(10);

MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
try {
MessageMetadata metadata = new MessageMetadata()
.setNumMessagesInBatch(10);

MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
MessageImpl<byte[]> msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
producer.sendAsync(msg).get();
Assert.fail("Shouldn't be able to send message");
} catch (ExecutionException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class MessageBuilderImpl implements MessageBuilder {

@Override
public Message<byte[]> build() {
return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ public class MessageImpl<T> implements Message<T> {
private boolean poolMessage;

// Constructor for out-going message
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema) {
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema,
String topic) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
msg.msgMetadata.clear();
msg.msgMetadata.copyFrom(msgMetadata);
msg.messageId = null;
msg.topic = null;
msg.topic = topic;
msg.cnx = null;
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public MessageMetadata getMetadataBuilder() {

public Message<T> getMessage() {
beforeSend();
return MessageImpl.create(msgMetadata, content, schema);
return MessageImpl.create(msgMetadata, content, schema, producer.topic);
}

public long getPublishTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MessageImplTest {
@Test
public void testGetSequenceIdNotAssociated() {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(new MessageMetadata(), payload, Schema.BYTES);
MessageImpl<?> msg = MessageImpl.create(new MessageMetadata(), payload, Schema.BYTES, null);

assertEquals(-1, msg.getSequenceId());
}
Expand All @@ -71,7 +71,7 @@ public void testSetDuplicatePropertiesKey() {
builder.addProperty().setKey("key1").setValue("value2");
builder.addProperty().setKey("key3").setValue("value3");
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
assertEquals("value2", msg.getProperty("key1"));
assertEquals("value3", msg.getProperty("key3"));
}
Expand All @@ -82,7 +82,7 @@ public void testGetSequenceIdAssociated() {
.setSequenceId(1234);

ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);

assertEquals(1234, msg.getSequenceId());
}
Expand All @@ -91,7 +91,7 @@ public void testGetSequenceIdAssociated() {
public void testGetProducerNameNotAssigned() {
MessageMetadata builder = new MessageMetadata();
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);

assertNull(msg.getProducerName());
}
Expand All @@ -102,7 +102,7 @@ public void testGetProducerNameAssigned() {
.setProducerName("test-producer");

ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<?> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);

assertEquals("test-producer", msg.getProducerName());
}
Expand All @@ -127,7 +127,7 @@ public void testDefaultGetProducerDataAssigned() {
MessageMetadata builder = new MessageMetadata()
.setProducerName("default");
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testInlineGetProducerDataAssigned() {
MessageMetadata builder = new MessageMetadata()
.setProducerName("inline");
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -186,7 +186,7 @@ public void testSeparatedGetProducerDataAssigned() {
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testDefaultAVROVersionGetProducerDataAssigned() {
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -256,7 +256,7 @@ public void testSeparatedAVROVersionGetProducerDataAssigned() {
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testDefaultJSONVersionGetProducerDataAssigned() {
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testSeparatedJSONVersionGetProducerDataAssigned() {
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -364,7 +364,7 @@ public void testDefaultAVROJSONVersionGetProducerDataAssigned() {
.setProducerName("default");
builder.setSchemaVersion(new byte[10]);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand Down Expand Up @@ -402,7 +402,7 @@ public void testSeparatedAVROJSONVersionGetProducerDataAssigned() {
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
Expand All @@ -421,7 +421,7 @@ public void testTypedSchemaGetNullValue() {
builder.setPartitionKey(Base64.getEncoder().encodeToString(encodeBytes));
builder.setPartitionKeyB64Encoded(true);
builder.setNullValue(true);
MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of(), null);
assertNull(msg.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testMessageImplReplicatedInfo() {
String from = "ClusterNameOfReplicatedFrom";
MessageMetadata builder = new MessageMetadata().setReplicatedFrom(from);
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);

assertTrue(msg.isReplicated());
assertEquals(msg.getReplicatedFrom(), from);
Expand All @@ -49,7 +49,7 @@ public void testMessageImplReplicatedInfo() {
public void testMessageImplNoReplicatedInfo() {
MessageMetadata builder = new MessageMetadata();
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);

assertFalse(msg.isReplicated());
assertNull(msg.getReplicatedFrom());
Expand All @@ -61,7 +61,7 @@ public void testTopicMessageImplReplicatedInfo() {
String topicName = "myTopic";
MessageMetadata builder = new MessageMetadata().setReplicatedFrom(from);
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null);

Expand All @@ -74,7 +74,7 @@ public void testTopicMessageImplNoReplicatedInfo() {
String topicName = "myTopic";
MessageMetadata builder = new MessageMetadata();
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void decodeDataWithNullSchemaVersion() {
Schema<GenericRecord> autoConsumeSchema = new AutoConsumeSchema();
byte[] bytes = "bytes data".getBytes();
MessageImpl<GenericRecord> message = MessageImpl.create(
new MessageMetadata(), ByteBuffer.wrap(bytes), autoConsumeSchema);
new MessageMetadata(), ByteBuffer.wrap(bytes), autoConsumeSchema, null);
Assert.assertNull(message.getSchemaVersion());
GenericRecord genericRecord = message.getValue();
Assert.assertEquals(genericRecord.getNativeObject(), bytes);
Expand Down

0 comments on commit 0f8aef2

Please sign in to comment.