Skip to content

Commit

Permalink
[Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (
Browse files Browse the repository at this point in the history
apache#11691)

* Fixed block forever bug in Consumer.batchReceive

Ensure that all poll() calls to pendingBatchReceives
is done within the pinnedInternalExecutor to avoid a
race condition where a peek and a subsequent poll get
different pending receives.

Moved the pinnedInternalExecutor into the ConsumerBase
as both ConsumerImpl and MultiTopicsConsumerImpl require it.

failingPendingReceive() now always submits its work to the
internal executor returning a CompletableFuture and all callers
treat it as an asynchronous operation.

* Fix broken MultiTopicsConsumerImplTest

Needed a real executor service to run the
failPendingReceive() method.

* Ensure all calls to messageReceived happen on internal executor

* Readd missing return statement in ConsumerImpl.closeAsync()

* Ensure correct usage of consumer internal executors

Ensure that the externalPinnedExecutor is only called for user
code and internalPinnedExecutor used for internal tasks.
Some test refactoring to manage creation of executors.
  • Loading branch information
Vanlightly authored Aug 23, 2021
1 parent 49c0796 commit bd942e1
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.Queues;

import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -37,8 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import io.netty.util.Timeout;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.BatchReceivePolicy;
Expand Down Expand Up @@ -73,15 +70,16 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
protected final ScheduledExecutorService pinnedExecutor;
protected final ScheduledExecutorService externalPinnedExecutor;
protected final ScheduledExecutorService internalPinnedExecutor;
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
Expand All @@ -91,7 +89,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T

protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors interceptors) {
super(client, topic);
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = conf.getSubscriptionName();
Expand All @@ -104,8 +103,10 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
this.executorProvider = executorProvider;
this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
this.interceptors = interceptors;
if (conf.getBatchReceivePolicy() != null) {
Expand Down Expand Up @@ -204,26 +205,11 @@ public CompletableFuture<Messages<T>> batchReceiveAsync() {
}
}

protected CompletableFuture<Message<T>> peekPendingReceive() {
CompletableFuture<Message<T>> receivedFuture = null;
while (receivedFuture == null) {
receivedFuture = pendingReceives.peek();
if (receivedFuture == null) {
break;
}
// skip done futures (cancelling a future could mark it done)
if (receivedFuture.isDone()) {
CompletableFuture<Message<T>> removed = pendingReceives.poll();
if (removed != receivedFuture) {
log.error("Bug! Removed future wasn't the expected one. expected={} removed={}", receivedFuture, removed);
}
receivedFuture = null;
}
}
return receivedFuture;
protected boolean hasNextPendingReceive() {
return !pendingReceives.isEmpty();
}

protected CompletableFuture<Message<T>> pollPendingReceive() {
protected CompletableFuture<Message<T>> nextPendingReceive() {
CompletableFuture<Message<T>> receivedFuture;
while (true) {
receivedFuture = pendingReceives.poll();
Expand All @@ -236,37 +222,60 @@ protected CompletableFuture<Message<T>> pollPendingReceive() {
}

protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
getExecutor(message).execute(() -> {
getInternalExecutor(message).execute(() -> {
if (!receivedFuture.complete(message)) {
log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}",
receivedFuture.isCancelled(), message);
}
});
}

protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
protected CompletableFuture<Void> failPendingReceive() {
if (internalPinnedExecutor.isShutdown()) {
// we need to fail any pending receives no matter what,
// to avoid blocking user code
failPendingReceives();
failPendingBatchReceives();
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
try {
failPendingReceives();
failPendingBatchReceives();
} finally {
future.complete(null);
}
});
return future;
}
}

private void failPendingReceives() {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
if (receiveFuture == null) {
break;
}
if (!receiveFuture.isDone()) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
new PulsarClientException.AlreadyClosedException(
String.format("The consumer which subscribes the topic %s with subscription name %s " +
"was already closed when cleaning and closing the consumers", topic, subscription)));
}
}
}

protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
while (!pendingBatchReceives.isEmpty()) {
OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
private void failPendingBatchReceives() {
while (hasNextBatchReceive()) {
OpBatchReceive<T> opBatchReceive = nextBatchReceive();
if (opBatchReceive == null || opBatchReceive.future == null) {
break;
}
if (!opBatchReceive.future.isDone()) {
opBatchReceive.future.completeExceptionally(
new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
new PulsarClientException.AlreadyClosedException(
String.format("The consumer which subscribes the topic %s with subscription name %s " +
"was already closed when cleaning and closing the consumers", topic, subscription)));
}
}
Expand Down Expand Up @@ -777,7 +786,7 @@ static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) {
}

protected void notifyPendingBatchReceivedCallBack() {
OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
OpBatchReceive<T> opBatchReceive = nextBatchReceive();
if (opBatchReceive == null) {
return;
}
Expand All @@ -790,31 +799,16 @@ protected void notifyPendingBatchReceivedCallBack() {
}
}

private OpBatchReceive<T> peekNextBatchReceive() {
OpBatchReceive<T> opBatchReceive = null;
while (opBatchReceive == null) {
opBatchReceive = pendingBatchReceives.peek();
// no entry available
if (opBatchReceive == null) {
return null;
}
// remove entries where future is null or has been completed (cancel / timeout)
if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
OpBatchReceive<T> removed = pendingBatchReceives.poll();
if (removed != opBatchReceive) {
log.error("Bug: Removed entry wasn't the expected one. expected={}, removed={}", opBatchReceive, removed);
}
opBatchReceive = null;
}
}
return opBatchReceive;
private boolean hasNextBatchReceive() {
return !pendingBatchReceives.isEmpty();
}


private OpBatchReceive<T> pollNextBatchReceive() {
private OpBatchReceive<T> nextBatchReceive() {
OpBatchReceive<T> opBatchReceive = null;
while (opBatchReceive == null) {
opBatchReceive = pendingBatchReceives.poll();

// no entry available
if (opBatchReceive == null) {
return null;
Expand Down Expand Up @@ -853,7 +847,11 @@ protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future
protected abstract void messageProcessed(Message<?> msg);


private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
private void pendingBatchReceiveTask(Timeout timeout) {
internalPinnedExecutor.execute(() -> doPendingBatchReceiveTask(timeout));
}

private void doPendingBatchReceiveTask(Timeout timeout) {
if (timeout.isCancelled()) {
return;
}
Expand All @@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
if (getState() == State.Closing || getState() == State.Closed) {
return;
}
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();

timeToWaitMs = batchReceivePolicy.getTimeoutMs();
OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();

while (firstOpBatchReceive != null) {
while (opBatchReceive != null) {
// If there is at least one batch receive, calculate the diff between the batch receive timeout
// and the elapsed time since the operation was created.
long diff = batchReceivePolicy.getTimeoutMs()
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);

if (diff <= 0) {
// The diff is less than or equal to zero, meaning that the batch receive has been timed out.
// complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
OpBatchReceive<T> op = pollNextBatchReceive();
if (op != null) {
completeOpBatchReceive(op);
completeOpBatchReceive(opBatchReceive);

// remove the peeked item from the queue
OpBatchReceive<T> removed = pendingBatchReceives.poll();

if (removed != opBatchReceive) {
// regression check, if this were to happen due to incorrect code changes in the future,
// (allowing multi-threaded calls to poll()), then ensure that the polled item is completed
// to avoid blocking user code

log.error("Race condition in consumer {} (should not cause data loss). "
+ " Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
if (removed != null && !removed.future.isDone()) {
completeOpBatchReceive(removed);
}
}
firstOpBatchReceive = peekNextBatchReceive();

} else {
// The diff is greater than zero, set the timeout to the diff value
timeToWaitMs = diff;
break;
}

opBatchReceive = pendingBatchReceives.peek();
}
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, timeToWaitMs, TimeUnit.MILLISECONDS);
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
timeToWaitMs, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -907,7 +917,7 @@ protected void triggerListener() {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(msg));
} else {
getExecutor(msg).execute(() -> {
getExternalExecutor(msg).execute(() -> {
callMessageListener(msg);
});
}
Expand Down Expand Up @@ -957,7 +967,7 @@ protected MessagesImpl<T> getNewMessagesImpl() {
}

protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && peekNextBatchReceive() != null;
return pendingBatchReceives != null && hasNextBatchReceive();
}

protected void increaseIncomingMessageSize(final Message<?> message) {
Expand Down Expand Up @@ -989,12 +999,21 @@ protected void clearIncomingMessages() {

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private ExecutorService getExecutor(Message<T> msg) {
private ExecutorService getExternalExecutor(Message<T> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
? receivedConsumer.externalPinnedExecutor
: externalPinnedExecutor;
return executor;
}

private ExecutorService getInternalExecutor(Message<T> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null
? receivedConsumer.pinnedExecutor
: pinnedExecutor;
ExecutorService executor = receivedConsumer != null && receivedConsumer.internalPinnedExecutor != null
? receivedConsumer.internalPinnedExecutor
: internalPinnedExecutor;
return executor;
}

Expand Down
Loading

0 comments on commit bd942e1

Please sign in to comment.