Skip to content

Commit

Permalink
[cleanup][pulsar-functions] Remove compiler warnings for return from …
Browse files Browse the repository at this point in the history
…finally (apache#16451)

### Motivation

To reduce warnings generated during compilation by cleaning up the code.

```
[WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[334,9] finally clause cannot complete normally
[WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[642,9] finally clause cannot complete normally
```

### Modifications

* Moved `return` from `finally` blocks
* Simplified code through the use of simpler, equivalent language constructs
* Made some constant fields `final`

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
teabot and merlimat authored Aug 11, 2022
1 parent 1fe8c06 commit 62e2a9a
Showing 1 changed file with 43 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,6 +32,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
Expand All @@ -71,7 +74,6 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;

@Slf4j
/**
* The scheduler manager is used to compute scheduling of function instances
* Only the leader computes new schedulings and writes assignments to the assignment topic
Expand All @@ -80,6 +82,7 @@
* 2. When worker loses leadership, this class will be closed which
* also closes the worker's producer to the assignments topic
*/
@Slf4j
public class SchedulerManager implements AutoCloseable {

private final WorkerConfig workerConfig;
Expand Down Expand Up @@ -109,7 +112,7 @@ public class SchedulerManager implements AutoCloseable {
private final PulsarAdmin admin;

@Getter
private Lock schedulerLock = new ReentrantLock(true);
private final Lock schedulerLock = new ReentrantLock(true);

private volatile boolean isRunning = false;

Expand All @@ -123,11 +126,9 @@ public class SchedulerManager implements AutoCloseable {

private MessageId metadataTopicLastMessage = MessageId.earliest;

private AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
private Future<?> currentRebalanceFuture;
private final AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);

private AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
private Future<?> currentDrainFuture;
private final AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
// The list of assignments moved due to the last drain op on a leader. Used in UTs, and debugging.
private List<Assignment> assignmentsMovedInLastDrain;

Expand All @@ -136,13 +137,13 @@ enum DrainOpStatus {
DrainNotInProgress,
DrainInProgress,
DrainCompleted
};
}

// A map to hold the status of recent drain operations.
// It is of the form {workerId : DrainOpStatus}.
// Entries are added when a drain operation starts, and removed on a periodic (when the worker is no longer seen
// on a poll).
private ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<String, DrainOpStatus>();
private final ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<>();

public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
Expand Down Expand Up @@ -248,8 +249,7 @@ public Future<?> rebalanceIfNotInprogress() {
rebalanceInProgress.set(false);
throw new TooFewWorkersException();
}
currentRebalanceFuture = rebalance();
return currentRebalanceFuture;
return rebalance();
} else {
throw new RebalanceInProgressException();
}
Expand Down Expand Up @@ -289,8 +289,7 @@ public Future<?> drainIfNotInProgress(String workerId) {
throw new UnknownWorkerException();
}

currentDrainFuture = drain(workerId);
return currentDrainFuture;
return drain(workerId);
} finally {
drainInProgressFlag.set(false);
}
Expand All @@ -301,37 +300,25 @@ public Future<?> drainIfNotInProgress(String workerId) {

public LongRunningProcessStatus getDrainStatus(String workerId) {
long startTime = System.nanoTime();
String errString;
LongRunningProcessStatus retVal = new LongRunningProcessStatus();
try {
val workerStatus = drainOpStatusMap.get(workerId);
if (workerStatus == null) {
errString = "Worker " + workerId + " not found in drain records";
retVal = LongRunningProcessStatus.forError(errString);
} else {
switch (workerStatus) {
default:
errString =
"getDrainStatus: Unexpected status " + workerStatus + " found for worker " + workerId;
retVal = LongRunningProcessStatus.forError(errString);
break;
case DrainCompleted:
retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
break;
case DrainInProgress:
retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
break;
case DrainNotInProgress:
retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
break;
}
}
} finally {
log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
workerId, (System.nanoTime() - startTime) / Math.pow(10, 9),
retVal.status, retVal.lastError);
return retVal;
}
LongRunningProcessStatus status = Optional.ofNullable(workerId).map(id ->
Optional.ofNullable(drainOpStatusMap.get(id)).map(opStatus ->
switch (opStatus) {
case DrainCompleted ->
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
case DrainInProgress ->
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
case DrainNotInProgress ->
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
}).orElse(
LongRunningProcessStatus.forError("Worker " + id + " not found in drain records")
)
).orElse(
new LongRunningProcessStatus()
);
log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
workerId, NANOSECONDS.toSeconds (System.nanoTime() - startTime),
status.status, status.lastError);
return status;
}

// The following method is used only for testing.
Expand All @@ -351,8 +338,7 @@ void setDrainOpsStatus(final String workerId, final DrainOpStatus dStatus) {
// The following method is used only for testing.
@VisibleForTesting
ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
val retVal = new ConcurrentHashMap<String, DrainOpStatus>(drainOpStatusMap);
return retVal;
return new ConcurrentHashMap<>(drainOpStatusMap);
}

private synchronized int getCurrentAvailableNumWorkers() {
Expand All @@ -361,16 +347,10 @@ private synchronized int getCurrentAvailableNumWorkers() {

private synchronized Set<String> getCurrentAvailableWorkers() {
Set<String> currentMembership = membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
.stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());

// iterate the set, instead of the concurrent hashmap
Iterator<String> iter = currentMembership.iterator();
while (iter.hasNext()) {
if (drainOpStatusMap.containsKey(iter.next())) {
iter.remove();
}
}

currentMembership.removeIf(drainOpStatusMap::containsKey);
return currentMembership;
}

Expand Down Expand Up @@ -445,11 +425,7 @@ void invokeScheduler() {
String workerId = workerIdToAssignmentEntry.getKey();
// remove assignments to workers that don't exist / died for now.
// wait for failure detector to unassign them in the future for re-scheduling
if (!availableWorkers.contains(workerId)) {
return false;
}

return true;
return availableWorkers.contains(workerId);
})
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
Expand Down Expand Up @@ -505,11 +481,7 @@ private void invokeRebalance() {
String workerId = workerIdToAssignmentEntry.getKey();
// remove assignments to workers that don't exist / died for now.
// wait for failure detector to unassign them in the future for re-scheduling
if (!availableWorkers.contains(workerId)) {
return false;
}

return true;
return availableWorkers.contains(workerId);
})
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
Expand Down Expand Up @@ -638,8 +610,8 @@ List<Assignment> invokeDrain(String workerId) {
workerId, drainSuccessful ? "" : "un",
(System.nanoTime() - startTime) / Math.pow(10, 9),
schedulerStats.getSummary(), schedulerStats);
return postDrainAssignments;
}
return postDrainAssignments;
}

private void compactAssignmentTopic() {
Expand All @@ -648,7 +620,7 @@ private void compactAssignmentTopic() {
this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
scheduledExecutorService.schedule(this::compactAssignmentTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
Expand All @@ -660,7 +632,7 @@ protected synchronized int updateWorkerDrainMap() {

if (drainOpStatusMap.size() > 0) {
val currentMembership = membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
.stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());
val removeWorkerIds = new ArrayList<String>();

for (String workerId : drainOpStatusMap.keySet()) {
Expand Down Expand Up @@ -688,7 +660,7 @@ private void compactFunctionMetadataTopic() {
this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
scheduledExecutorService.schedule(this::compactFunctionMetadataTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
Expand All @@ -708,7 +680,7 @@ private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
}

private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
boolean externallyManagedRuntime) {
boolean externallyManagedRuntime) {
Map<String, Function.Instance> functionInstances = new HashMap<>();
for (FunctionMetaData functionMetaData : allFunctions) {
for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
Expand All @@ -719,7 +691,7 @@ private static Map<String, Function.Instance> computeAllInstances(List<FunctionM
}

static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
boolean externallyManagedRuntime) {
boolean externallyManagedRuntime) {
List<Function.Instance> functionInstances = new LinkedList<>();
if (!externallyManagedRuntime) {
int instances = functionMetaData.getFunctionDetails().getParallelism();
Expand Down Expand Up @@ -831,9 +803,9 @@ private static class WorkerStats {
private boolean alive;
}

private Map<String, WorkerStats> workerStatsMap = new HashMap<>();
private final Map<String, WorkerStats> workerStatsMap = new HashMap<>();

private Map<String, String> instanceToWorkerId = new HashMap<>();
private final Map<String, String> instanceToWorkerId = new HashMap<>();

public SchedulerStats(Map<String, Map<String, Assignment>> workerIdToAssignments, Set<String> workers) {

Expand Down

0 comments on commit 62e2a9a

Please sign in to comment.