From 67fdfe2ab4058692f314768b8a78d9da407d0c6e Mon Sep 17 00:00:00 2001 From: Ezequiel Lovelle Date: Tue, 17 Dec 2019 08:49:43 -0300 Subject: [PATCH] Fixes some minor thread safety violations (#5793) *Motivation* Reading latest changes through pulsar code I found some minor thread safety violations. *Modifications* - When possible, prefer using final fields. fixed for initialStartMessageId and startMessageRollbackDurationInSec fields. - Fix issue accessing timeout field within inner lock on UnAckedMessageTracker --- .../org/apache/pulsar/client/impl/ConsumerBase.java | 2 -- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 5 +++-- .../pulsar/client/impl/UnAckedMessageTracker.java | 10 +++++----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 59b565ae146ff..86e7fb27da751 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -33,7 +33,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.FastThreadLocal; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerEventListener; @@ -524,7 +523,6 @@ protected void notifyPendingBatchReceivedCallBack() { notifyPendingBatchReceivedCallBack(opBatchReceive); } - protected void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { MessagesImpl messages = getNewMessagesImpl(); Message msgPeeked = incomingMessages.peek(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ab72571a31c00..c7211d2f2bd28 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -121,8 +121,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final int priorityLevel; private final SubscriptionMode subscriptionMode; private volatile BatchMessageIdImpl startMessageId; - private BatchMessageIdImpl initialStartMessageId; - private long startMessageRollbackDurationInSec; + + private final BatchMessageIdImpl initialStartMessageId; + private final long startMessageRollbackDurationInSec; private volatile boolean hasReachedEndOfTopic; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index d137011a54d24..3e85d48a33144 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -139,13 +139,13 @@ public void run(Timeout t) throws Exception { headPartition.clear(); timePartitions.addLast(headPartition); } finally { + if (messageIds.size() > 0) { + consumerBase.onAckTimeoutSend(messageIds); + consumerBase.redeliverUnacknowledgedMessages(messageIds); + } + timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); writeLock.unlock(); } - if (messageIds.size() > 0) { - consumerBase.onAckTimeoutSend(messageIds); - consumerBase.redeliverUnacknowledgedMessages(messageIds); - } - timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } }, this.tickDurationInMs, TimeUnit.MILLISECONDS); }