Skip to content

Commit

Permalink
Don't request redelivery of dead letter messages (apache#4426)
Browse files Browse the repository at this point in the history
If we send a message to the dead letter topic, we shouldn't also
request that the same message be redelivered.

The process to send to DLQ does do an acknowledge(), but acknowledge
is async and not guaranteed, so it can race with the redelivery
request.
  • Loading branch information
ivankelly authored and merlimat committed May 31, 2019
1 parent 0ff23b4 commit c756686
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,15 +1268,15 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
.collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
MessageIdData.Builder builder = MessageIdData.newBuilder();
batches.forEach(ids -> {
List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
// process message possible to dead letter topic
processPossibleToDLQ(messageId);
// attempt to remove message from batchMessageAckTracker
builder.setPartition(messageId.getPartitionIndex());
builder.setLedgerId(messageId.getLedgerId());
builder.setEntryId(messageId.getEntryId());
return builder.build();
}).collect(Collectors.toList());
List<MessageIdData> messageIdDatas = ids.stream()
.filter(messageId -> !processPossibleToDLQ(messageId))
.map(messageId -> {
builder.setPartition(messageId.getPartitionIndex());
builder.setLedgerId(messageId.getLedgerId());
builder.setEntryId(messageId.getEntryId());
return builder.build();
}).collect(Collectors.toList());

ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
messageIdDatas.forEach(MessageIdData::recycle);
Expand All @@ -1299,7 +1299,7 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
}
}

private void processPossibleToDLQ(MessageIdImpl messageId) {
private boolean processPossibleToDLQ(MessageIdImpl messageId) {
List<MessageImpl<T>> deadLetterMessages = null;
if (possibleSendToDeadLetterTopicMessages != null) {
if (messageId instanceof BatchMessageIdImpl) {
Expand Down Expand Up @@ -1329,11 +1329,13 @@ private void processPossibleToDLQ(MessageIdImpl messageId) {
.send();
}
acknowledge(messageId);
return true;
} catch (Exception e) {
log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
}
}
}
return false;
}

@Override
Expand Down

0 comments on commit c756686

Please sign in to comment.