diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8d91ca12ad3f4..3426e22341a06 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -747,17 +747,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade messageId.getEntryId()); } - MessageMetadata msgMetadata = null; - ByteBuf payload = headersAndPayload; - if (!verifyChecksum(headersAndPayload, messageId)) { // discard message with checksum error discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return; } + MessageMetadata msgMetadata; try { - msgMetadata = Commands.parseMessageMetadata(payload); + msgMetadata = Commands.parseMessageMetadata(headersAndPayload); } catch (Throwable t) { discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return; @@ -768,15 +766,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", - topic, subscription, msgId); + log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", + topic, subscription, consumerName, msgId); } increaseAvailablePermits(cnx, numMessages); return; } - ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx); + ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, headersAndPayload, cnx); boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata); @@ -950,6 +948,8 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription, consumerName); } + singleMessagePayload.release(); + singleMessageMetadataBuilder.recycle(); ++skippedMessages; continue;