Skip to content

Commit

Permalink
Bugfix - release and recycle on discarded messages (apache#4342)
Browse files Browse the repository at this point in the history
Don't leak resources when a message is being discarded.

*Modifications*

  - Fix missing release() and recycle() for discarded message on
    receiveIndividualMessagesFromBatch method.
  - Fix argument missing of debug logging {}-placeholder.
  - Fix unnecessary variable reference `payload` on messageReceived().
  • Loading branch information
lovelle authored and merlimat committed May 24, 2019
1 parent b161328 commit 71f3928
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,17 +747,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
messageId.getEntryId());
}

MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;

if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}

MessageMetadata msgMetadata;
try {
msgMetadata = Commands.parseMessageMetadata(payload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
Expand All @@ -768,15 +766,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, msgId);
log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, consumerName, msgId);
}

increaseAvailablePermits(cnx, numMessages);
return;
}

ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx);
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, headersAndPayload, cnx);

boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);

Expand Down Expand Up @@ -950,6 +948,8 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
log.debug("[{}] [{}] Ignoring message from before the startMessageId", subscription,
consumerName);
}
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();

++skippedMessages;
continue;
Expand Down

0 comments on commit 71f3928

Please sign in to comment.