Skip to content

Commit

Permalink
Refactor the try-lock code pattern (apache#10742)
Browse files Browse the repository at this point in the history
It's recommended that calls to Lock#lock should be immediately followed by a `try` block with a `finally` clause which releases the lock. I came across a few pieces of code that didn't follow this pattern and try to fix it for consistency.

References:
https://errorprone.info/bugpattern/LockNotBeforeTry
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html
  • Loading branch information
fantapsody authored May 30, 2021
1 parent 04b5da0 commit 82eae5d
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,6 @@ public Map<String, String> getProtocolDataToAdvertise() {
* Start the pulsar service instance.
*/
public void start() throws PulsarServerException {
mutex.lock();

LOG.info("Starting Pulsar Broker service; version: '{}'",
(brokerVersion != null ? brokerVersion : "unknown"));
LOG.info("Git Revision {}", PulsarVersion.getGitSha());
Expand All @@ -570,6 +568,7 @@ public void start() throws PulsarServerException {
PulsarVersion.getBuildHost(),
PulsarVersion.getBuildTime());

mutex.lock();
try {
if (state != State.Init) {
throw new PulsarServerException("Cannot start the service once it was stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ protected void notifyPendingBatchReceivedCallBack() {
return;
}

reentrantLock.lock();
try {
reentrantLock.lock();
notifyPendingBatchReceivedCallBack(opBatchReceive);
} finally {
reentrantLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
}
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
if (messageIds.size() != 0) {
addListAcknowledgment(messageIds);
return this.currentIndividualAckFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ synchronized public void close() {

public String getStatsAsString() throws IOException {
if (isInitialized) {
statsLock.readLock().lock();
try {
statsLock.readLock().lock();
return stats.getStatsAsString();
} finally {
statsLock.readLock().unlock();
Expand All @@ -468,8 +468,8 @@ public String getStatsAsString() throws IOException {

public InstanceCommunication.MetricsData getAndResetMetrics() {
if (isInitialized) {
statsLock.writeLock().lock();
try {
statsLock.writeLock().lock();
InstanceCommunication.MetricsData metricsData = internalGetMetrics();
internalResetMetrics();
return metricsData;
Expand All @@ -482,8 +482,8 @@ public InstanceCommunication.MetricsData getAndResetMetrics() {

public InstanceCommunication.MetricsData getMetrics() {
if (isInitialized) {
statsLock.readLock().lock();
try {
statsLock.readLock().lock();
return internalGetMetrics();
} finally {
statsLock.readLock().unlock();
Expand All @@ -494,8 +494,8 @@ public InstanceCommunication.MetricsData getMetrics() {

public void resetMetrics() {
if (isInitialized) {
statsLock.writeLock().lock();
try {
statsLock.writeLock().lock();
internalResetMetrics();
} finally {
statsLock.writeLock().unlock();
Expand Down Expand Up @@ -540,9 +540,8 @@ private Builder createMetricsDataBuilder() {
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
if (isInitialized) {
statsLock.readLock().lock();
try {
statsLock.readLock().lock();

functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<Event<T>> expired = null;

lock.lock();
try {
lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
Expand Down Expand Up @@ -196,8 +196,8 @@ private List<Event<T>> scanEvents(boolean fullScan) {
List<Event<T>> eventsToExpire = new ArrayList<>();
List<Event<T>> eventsToProcess = new ArrayList<>();

lock.lock();
try {
lock.lock();
Iterator<Event<T>> it = queue.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ public void start(AuthenticationService authenticationService,
() -> {
// computing a new schedule and checking for failures cannot happen concurrently
// both paths of code modify internally cached assignments map in function runtime manager
schedulerManager.getSchedulerLock().lock();
try {
schedulerManager.getSchedulerLock().lock();
membershipManager.checkFailures(
functionMetaDataManager, functionRuntimeManager, schedulerManager);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,8 @@ private Future<?> scheduleInternal(Runnable runnable, String errMsg) {

try {
return executorService.submit(() -> {
schedulerLock.lock();
try {
schedulerLock.lock();

boolean isLeader = leaderService.isLeader();
if (isLeader) {
try {
Expand Down Expand Up @@ -501,10 +500,9 @@ private Pair<List<Function.Instance>, List<Assignment>> getUnassignedFunctionIns
@Override
public synchronized void close() {
log.info("Closing scheduler manager");
// make sure we are not closing while a scheduling is being calculated
schedulerLock.lock();
try {
// make sure we are not closing while a scheduling is being calculated
schedulerLock.lock();

isRunning = false;

if (scheduledExecutorService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public long getAvailableCacheSize() {
* @param size allocate size
*/
public void allocate(long size) {
lock.lock();
try {
lock.lock();
availableCacheSize.add(-size);
} finally {
lock.unlock();
Expand All @@ -69,8 +69,8 @@ public void allocate(long size) {
* @param size release size
*/
public void release(long size) {
lock.lock();
try {
lock.lock();
availableCacheSize.add(size);
if (availableCacheSize.longValue() > maxCacheSize) {
availableCacheSize.reset();
Expand Down

0 comments on commit 82eae5d

Please sign in to comment.