Skip to content

Commit

Permalink
fix: non-batched messages cause sql query to fail (apache#3684)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and merlimat committed Feb 26, 2019
1 parent 915a4d0 commit 397ef33
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId

if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
processor.process(
RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload.retain(), ledgerId, entryId, 0));
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ public void teardownPresto() {
}

@Test
public void testSimpleSQLQuery() throws Exception {
public void testSimpleSQLQueryBatched() throws Exception {
testSimpleSQLQuery(true);
}

@Test
public void testSimpleSQLQueryNonBatched() throws Exception {
testSimpleSQLQuery(false);
}

public void testSimpleSQLQuery(boolean isBatched) throws Exception {

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
Expand All @@ -79,9 +88,9 @@ public void testSimpleSQLQuery() throws Exception {
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
.topic(stocksTopic)
.enableBatching(isBatched)
.create();


for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
producer.send(stock);
Expand Down

0 comments on commit 397ef33

Please sign in to comment.