Skip to content

Commit

Permalink
[improve][test] Added message properties tests for batch and non-batc…
Browse files Browse the repository at this point in the history
…h messages (apache#23473)
  • Loading branch information
codelipenghui authored Oct 18, 2024
1 parent 6ebca5e commit 8de27a2
Showing 1 changed file with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1905,4 +1905,76 @@ public void testUpdatePropertiesOnNonDurableSub() throws Exception {
assertEquals(cursor.getCursorProperties().size(), 1);
assertEquals(cursor.getCursorProperties().get("foo"), "bar");
}

@Test
public void testPeekMessageWithProperties() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testPeekMessageWithProperties";
admin.topics().createNonPartitionedTopic(topicName);

// Test non-batch messages
@Cleanup
Producer<String> nonBatchProducer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();

Map<String, String> props1 = new HashMap<>();
props1.put("key1", "value1");
props1.put("KEY2", "VALUE2");
props1.put("KeY3", "VaLuE3");

nonBatchProducer.newMessage()
.properties(props1)
.value("non-batch-message")
.send();

Message<byte[]> peekedMessage = admin.topics().peekMessages(topicName, "sub-peek", 1).get(0);
assertEquals(new String(peekedMessage.getData()), "non-batch-message");
assertEquals(peekedMessage.getProperties().size(), 3);
assertEquals(peekedMessage.getProperties().get("key1"), "value1");
assertEquals(peekedMessage.getProperties().get("KEY2"), "VALUE2");
assertEquals(peekedMessage.getProperties().get("KeY3"), "VaLuE3");

admin.topics().truncate(topicName);

// Test batch messages
@Cleanup
Producer<String> batchProducer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.create();

Map<String, String> props2 = new HashMap<>();
props2.put("batch-key1", "batch-value1");
props2.put("BATCH-KEY2", "BATCH-VALUE2");
props2.put("BaTcH-kEy3", "BaTcH-vAlUe3");

batchProducer.newMessage()
.properties(props2)
.value("batch-message-1")
.sendAsync();

batchProducer.newMessage()
.properties(props2)
.value("batch-message-2")
.send();

List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, "sub-peek", 2);
assertEquals(peekedMessages.size(), 2);

for (int i = 0; i < 2; i++) {
Message<byte[]> batchMessage = peekedMessages.get(i);
assertEquals(new String(batchMessage.getData()), "batch-message-" + (i + 1));
assertEquals(batchMessage.getProperties().size(),
3 + 2 // 3 properties from the message + 2 properties from the batch
);
assertEquals(batchMessage.getProperties().get("X-Pulsar-num-batch-message"), "2");
assertNotNull(batchMessage.getProperties().get("X-Pulsar-batch-size"));
assertEquals(batchMessage.getProperties().get("batch-key1"), "batch-value1");
assertEquals(batchMessage.getProperties().get("BATCH-KEY2"), "BATCH-VALUE2");
assertEquals(batchMessage.getProperties().get("BaTcH-kEy3"), "BaTcH-vAlUe3");
}
}
}

0 comments on commit 8de27a2

Please sign in to comment.