Skip to content

Commit

Permalink
Async the DLQ process (apache#9552)
Browse files Browse the repository at this point in the history
Fixes apache#9540

### Motivation

Async the DLQ process. Currently, the DLQ process is a synchronous process. Since we process the DLQ in the timer and the timer will acquire a write lock during writing the data to the DLQ, the data writing process will use the IO thread and the messages that add to the UnAckedMessageTracker also use the IO thread and if also acquire the same write lock. So this will result in a dead lock.
  • Loading branch information
codelipenghui authored Feb 11, 2021
1 parent 610b17d commit fb0f3e3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testDeadLetterTopic() throws Exception {
newPulsarClient.close();
}

@Test(timeOut = 10000)
@Test(timeOut = 30000)
public void testDLQDisabledForKeySharedSubtype() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -163,7 +162,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final DeadLetterPolicy deadLetterPolicy;

private volatile Producer<T> deadLetterProducer;
private volatile CompletableFuture<Producer<T>> deadLetterProducer;

private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -580,6 +579,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
createProducerLock.writeLock().unlock();
}
}
CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
MessageImpl<T> retryMessage = null;
Expand Down Expand Up @@ -613,32 +613,33 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));

if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
processPossibleToDLQ((MessageIdImpl)messageId);
if (deadLetterProducer == null) {
try {
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy
.getDeadLetterTopic())
.blockIfQueueFull(false)
.create();
}
} catch (Exception e) {
log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
} finally {
createProducerLock.writeLock().unlock();
}
}
if (deadLetterProducer != null) {
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
TypedMessageBuilder<T> typedMessageBuilderNew = deadLetterProducer.newMessage()
.value(retryMessage.getValue())
.properties(propertiesMap);
typedMessageBuilderNew.send();
return doAcknowledge(messageId, ackType, properties, null);
}
initDeadLetterProducerIfNeeded();
MessageId finalMessageId = messageId;
String finalOriginTopicNameStr = originTopicNameStr;
String finalOriginMessageIdStr = originMessageIdStr;
MessageImpl<T> finalRetryMessage = retryMessage;
deadLetterProducer.thenAccept(dlqProducer -> {
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, finalOriginTopicNameStr);
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, finalOriginMessageIdStr);
TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage()
.value(finalRetryMessage.getValue())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
result.complete(null);
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
result.completeExceptionally(ex);
deadLetterProducer = null;
return null;
});
} else {
TypedMessageBuilder<T> typedMessageBuilderNew = retryLetterProducer.newMessage()
.value(retryMessage.getValue())
Expand All @@ -654,13 +655,19 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}
} catch (Exception e) {
log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
Set<MessageId> messageIds = new HashSet<>();
messageIds.add(messageId);
Set<MessageId> messageIds = Collections.singleton(messageId);
unAckedMessageTracker.remove(messageId);
redeliverUnacknowledgedMessages(messageIds);
}
}
return CompletableFuture.completedFuture(null);
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
Set<MessageId> messageIds = Collections.singleton(finalMessageId);
unAckedMessageTracker.remove(finalMessageId);
redeliverUnacknowledgedMessages(messageIds);
return null;
});
return result;
}

@Override
Expand Down Expand Up @@ -1635,18 +1642,12 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
.map(messageId -> (MessageIdImpl)messageId)
.collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
batches.forEach(ids -> {
List<MessageIdData> messageIdDatas = ids.stream()
.filter(messageId -> !processPossibleToDLQ(messageId))
.map(messageId -> {
return new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId());
}).collect(Collectors.toList());
if (!messageIdDatas.isEmpty()) {
ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
}
getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
if (!messageIdData.isEmpty()) {
ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
}
});
});
if (messagesFromQueue > 0) {
increaseAvailablePermits(cnx, messagesFromQueue);
Expand All @@ -1670,48 +1671,91 @@ protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op);
}

private boolean processPossibleToDLQ(MessageIdImpl messageId) {
private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
if (messageIds == null || messageIds.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<MessageIdData> data = new ArrayList<>(messageIds.size());
List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
messageIds.forEach(messageId -> {
CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
futures.add(future);
future.thenAccept(sendToDLQ -> {
if (!sendToDLQ) {
data.add(new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId()));
}
});
});
return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
}

private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId) {
List<MessageImpl<T>> deadLetterMessages = null;
if (possibleSendToDeadLetterTopicMessages != null) {
if (messageId instanceof BatchMessageIdImpl) {
deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex()));
} else {
deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId);
messageId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
}
deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId);
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
if (deadLetterMessages != null) {
if (deadLetterProducer == null) {
try {
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.create();
}
} catch (Exception e) {
log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
} finally {
createProducerLock.writeLock().unlock();
initDeadLetterProducerIfNeeded();
List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
MessageIdImpl finalMessageId = messageId;
deadLetterProducer.thenAccept(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
producerDLQ.newMessage()
.value(message.getValue())
.properties(message.getProperties())
.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
if (ex != null) {
log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.",
topicName, subscription, consumerName, finalMessageId, ex);
} else {
result.complete(true);
}
});
}).exceptionally(ex -> {
log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}",
topicName, subscription, consumerName, finalMessageId, ex);
result.complete(false);
return null;
});
}
}
if (deadLetterProducer != null) {
try {
for (MessageImpl<T> message : deadLetterMessages) {
deadLetterProducer.newMessage()
.value(message.getValue())
.properties(message.getProperties())
.send();
}
acknowledge(messageId);
return true;
} catch (Exception e) {
log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
}).exceptionally(ex -> {
deadLetterProducer = null;
result.complete(false);
return null;
});
} else {
result.complete(false);
}
return result;
}

private void initDeadLetterProducerIfNeeded() {
if (deadLetterProducer == null) {
try {
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.createAsync();
}
} catch (Exception e) {
log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
} finally {
createProducerLock.writeLock().unlock();
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ public void run(Timeout t) throws Exception {
headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
try {
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
} finally {
writeLock.unlock();
}
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
writeLock.unlock();
}
}
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit fb0f3e3

Please sign in to comment.