Skip to content

Commit

Permalink
Fixes some minor thread safety violations (apache#5793)
Browse files Browse the repository at this point in the history
*Motivation*

Reading latest changes through pulsar code I found some minor thread safety
violations.

*Modifications*

  - When possible, prefer using final fields. fixed for initialStartMessageId
    and startMessageRollbackDurationInSec fields.
  - Fix issue accessing timeout field within inner lock on UnAckedMessageTracker
  • Loading branch information
lovelle authored and jiazhai committed Dec 17, 2019
1 parent a7e2f13 commit 67fdfe2
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
Expand Down Expand Up @@ -524,7 +523,6 @@ protected void notifyPendingBatchReceivedCallBack() {
notifyPendingBatchReceivedCallBack(opBatchReceive);
}


protected void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private volatile BatchMessageIdImpl startMessageId;
private BatchMessageIdImpl initialStartMessageId;
private long startMessageRollbackDurationInSec;

private final BatchMessageIdImpl initialStartMessageId;
private final long startMessageRollbackDurationInSec;

private volatile boolean hasReachedEndOfTopic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ public void run(Timeout t) throws Exception {
headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
writeLock.unlock();
}
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
}
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}
Expand Down

0 comments on commit 67fdfe2

Please sign in to comment.