Skip to content

Commit

Permalink
[FLINK-29234] JobMasterServiceLeadershipRunner#closeAsync has a lock …
Browse files Browse the repository at this point in the history
…with an excessive range

Currently, in JobMasterServiceLeadershipRunner, Leader event callback such as grantLeadership and revokeLeadership will be invoked with nested lock(e.g. curator's event lock and DefaultLeaderElectionService's lock) and try to acquire JobMasterServiceLeadershipRunner's own lock. This will lead to many deadlock risks, such as the simultaneous close and grantLeadership. To avoid this, we narrow the range of lock within JobMasterServiceLeadershipRunner#closeAsync. In the future, we'd better take some measures to avoid possible deadlocks. For example, the leaderEvent of the leaderElectionService is executed by a separate thread.

This closes apache#21137
  • Loading branch information
reswqa authored and xintongsong committed Nov 18, 2022
1 parent 269ba49 commit d745f5b
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,40 +124,40 @@ public JobMasterServiceLeadershipRunner(

@Override
public CompletableFuture<Void> closeAsync() {
final CompletableFuture<Void> processTerminationFuture;
synchronized (lock) {
if (state != State.STOPPED) {
state = State.STOPPED;
if (state == State.STOPPED) {
return terminationFuture;
}

LOG.debug("Terminating the leadership runner for job {}.", getJobID());
state = State.STOPPED;

jobMasterGatewayFuture.completeExceptionally(
new FlinkException(
"JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership."));
resultFuture.complete(
JobManagerRunnerResult.forSuccess(
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));

final CompletableFuture<Void> processTerminationFuture =
jobMasterServiceProcess.closeAsync();

final CompletableFuture<Void> serviceTerminationFuture =
FutureUtils.runAfterwards(
processTerminationFuture,
() -> {
classLoaderLease.release();
leaderElectionService.stop();
});

FutureUtils.forward(serviceTerminationFuture, terminationFuture);

terminationFuture.whenComplete(
(unused, throwable) ->
LOG.debug(
"Leadership runner for job {} has been terminated.",
getJobID()));
}
LOG.debug("Terminating the leadership runner for job {}.", getJobID());

jobMasterGatewayFuture.completeExceptionally(
new FlinkException(
"JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership."));

resultFuture.complete(
JobManagerRunnerResult.forSuccess(
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));

processTerminationFuture = jobMasterServiceProcess.closeAsync();
}

final CompletableFuture<Void> serviceTerminationFuture =
FutureUtils.runAfterwards(
processTerminationFuture,
() -> {
classLoaderLease.release();
leaderElectionService.stop();
});

FutureUtils.forward(serviceTerminationFuture, terminationFuture);

terminationFuture.whenComplete(
(unused, throwable) ->
LOG.debug("Leadership runner for job {} has been terminated.", getJobID()));
return terminationFuture;
}

Expand Down
Loading

0 comments on commit d745f5b

Please sign in to comment.