Skip to content

Commit

Permalink
Make Consumer thread safe and lock-free (apache#10352)
Browse files Browse the repository at this point in the history
### Motivation
Lock-free solution for apache#10240
  • Loading branch information
315157973 authored May 5, 2021
1 parent c3a20a1 commit def1932
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 46 deletions.
7 changes: 7 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -129,8 +130,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final int receiverQueueRefillThreshold;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
Expand Down Expand Up @@ -188,6 +187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final boolean poolMessages;

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final ExecutorService internalPinnedExecutor;

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
Expand Down Expand Up @@ -255,6 +255,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
this.internalPinnedExecutor = client.getInternalExecutorService();

if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
Expand Down Expand Up @@ -412,25 +413,17 @@ protected Message<T> internalReceive() throws PulsarClientException {
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = null;
lock.writeLock().lock();
try {
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
} finally {
lock.writeLock().unlock();
}

if (message != null) {
messageProcessed(message);
result.complete(beforeConsume(message));
}
if (message != null) {
messageProcessed(message);
result.complete(beforeConsume(message));
}
});

return result;
}
Expand Down Expand Up @@ -475,8 +468,7 @@ protected Messages<T> internalBatchReceive() throws PulsarClientException {
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
lock.writeLock().lock();
try {
internalPinnedExecutor.execute(() -> {
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
Expand All @@ -498,9 +490,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
}
} finally {
lock.writeLock().unlock();
}
});
return result;
}

Expand Down Expand Up @@ -973,15 +963,12 @@ private void closeConsumerTasks() {
}

private void failPendingReceive() {
lock.readLock().lock();
try {
internalPinnedExecutor.execute(() -> {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
}
});
}

void activeConsumerChanged(boolean isActive) {
Expand Down Expand Up @@ -1079,11 +1066,10 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive
poolMessages);
uncompressedPayload.release();

lock.readLock().lock();
try {
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
internalPinnedExecutor.execute(() -> {
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Expand All @@ -1094,16 +1080,19 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
});
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);

uncompressedPayload.release();
}
internalPinnedExecutor.execute(()
-> tryTriggerListener());

}

private void tryTriggerListener() {
if (listener != null) {
triggerListener();
}
Expand Down Expand Up @@ -1306,17 +1295,14 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
lock.readLock().lock();
try {
internalPinnedExecutor.execute(() -> {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
singleMessagePayload.release();
singleMessagePayload.release();
});
}
if (ackBitSet != null) {
ackBitSet.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.Timer;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.mockito.Mockito;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -49,12 +52,15 @@ public class ConsumerImplTest {
private ExecutorProvider executorProvider;
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
private ExecutorService executorService;

@BeforeMethod(alwaysRun = true)
public void setUp() {
executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
executorService = Executors.newSingleThreadExecutor();
when(client.getInternalExecutorService()).thenReturn(executorService);
ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
Expand All @@ -74,6 +80,10 @@ public void cleanup() {
executorProvider.shutdownNow();
executorProvider = null;
}
if (executorService != null) {
executorService.shutdownNow();
executorService = null;
}
}

@Test(invocationTimeOut = 1000)
Expand Down Expand Up @@ -162,7 +172,7 @@ public void testNotifyPendingReceivedCallback_WorkNormally() {
public void testReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
Assert.assertEquals(consumer.peekPendingReceive(), future);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.peekPendingReceive(), future));
// when
future.cancel(true);
// then
Expand All @@ -173,7 +183,7 @@ public void testReceiveAsyncCanBeCancelled() {
public void testBatchReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Messages<byte[]>> future = consumer.batchReceiveAsync();
Assert.assertTrue(consumer.hasPendingBatchReceive());
Awaitility.await().untilAsserted(() -> Assert.assertTrue(consumer.hasPendingBatchReceive()));
// when
future.cancel(true);
// then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,50 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

public class ReaderImplTest {
ReaderImpl<byte[]> reader;
private ExecutorService executorService;

@BeforeMethod
void setupReader() {
PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
readerConfiguration.setTopicName("topicName");
executorService = Executors.newSingleThreadExecutor();
when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
consumerFuture, Schema.BYTES);
}

@AfterMethod
public void clean() {
if (executorService != null) {
executorService.shutdownNow();
executorService = null;
}
}

@Test
void shouldSupportCancellingReadNextAsync() {
// given
CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
assertNotNull(reader.getConsumer().peekPendingReceive());
Awaitility.await().untilAsserted(() -> {
assertNotNull(reader.getConsumer().peekPendingReceive());
});

// when
future.cancel(false);
Expand Down

0 comments on commit def1932

Please sign in to comment.