Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix_admin_getIndividualMsgsFromBatch_bug (apache#6715)
**Motivation** fix when get batch message from http response, only get the first message. apache#6714 ```javascript for (int i = 0; i < batchSize; i++) { String batchMsgId = msgId + ":" + i; PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata .newBuilder(); ByteBuf buf = Unpooled.wrappedBuffer(data); // here you need to move out of the loop try { ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i, batchSize); SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); if (singleMessageMetadata.getPropertiesCount() > 0) { for (KeyValue entry : singleMessageMetadata.getPropertiesList()) { properties.put(entry.getKey(), entry.getValue()); } } ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES)); } catch (Exception ex) { log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex); } buf.release(); singleMessageMetadataBuilder.recycle(); } ``` ByteBuf buf need to move out of the loop **Changes** Replace old value with new value ```javascript ByteBuf buf = Unpooled.wrappedBuffer(data); for (int i = 0; i < batchSize; i++) { String batchMsgId = msgId + ":" + i; PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata .newBuilder(); try { ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i, batchSize); SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); if (singleMessageMetadata.getPropertiesCount() > 0) { for (KeyValue entry : singleMessageMetadata.getPropertiesList()) { properties.put(entry.getKey(), entry.getValue()); } } ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES)); } catch (Exception ex) { log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex); } singleMessageMetadataBuilder.recycle(); } buf.release(); ```
- Loading branch information