Skip to content

Commit

Permalink
[FLINK-36168][runtime] Replaces goToFinished with the cancel method (a…
Browse files Browse the repository at this point in the history
…pache#25305)

We have to transition to all the expected state transitions properly to handle all the cleanup. This also requires proper handling of the state transitions by the DummyState test implementations.
  • Loading branch information
XComp authored Sep 12, 2024
1 parent 77da041 commit 83be264
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -206,24 +207,40 @@ void after() {
}

private static void closeInExecutorService(
@Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor executor) {
@Nullable AdaptiveScheduler scheduler, Executor executor) {
if (scheduler != null) {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
executor.execute(
() -> {
try {
// no matter what state the scheduler is in; we have to go to Finished
// state to please the Preconditions of the close call
if (scheduler.getState().getClass() != Finished.class) {
scheduler.goToFinished(
scheduler.getArchivedExecutionGraph(
JobStatus.CANCELED, null));
}
scheduler.cancel();

FutureUtils.forward(scheduler.closeAsync(), closeFuture);
} catch (Throwable t) {
closeFuture.completeExceptionally(t);
}
});

// we have to wait for the job termination outside the main thread because the
// cancellation tasks are scheduled on the main thread as well.
scheduler
.getJobTerminationFuture()
.whenCompleteAsync(
(jobStatus, error) -> {
assertThat(scheduler.getState().getClass())
.isEqualTo(Finished.class);

if (error != null) {
closeFuture.completeExceptionally(error);
} else {
try {
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
} catch (Throwable t) {
closeFuture.completeExceptionally(t);
}
}
},
executor);
assertThatFuture(closeFuture).eventuallySucceeds();
}
}
Expand Down Expand Up @@ -310,7 +327,7 @@ void testIsState() throws Exception {
final State state = scheduler.getState();

assertThat(scheduler.isState(state)).isTrue();
assertThat(scheduler.isState(new DummyState())).isFalse();
assertThat(scheduler.isState(new DummyState(scheduler))).isFalse();
}

@Test
Expand All @@ -337,7 +354,7 @@ void testRunIfStateWithStateMismatch() throws Exception {
.build();

AtomicBoolean ran = new AtomicBoolean(false);
scheduler.runIfState(new DummyState(), () -> ran.set(true));
scheduler.runIfState(new DummyState(scheduler), () -> ran.set(true));
assertThat(ran.get()).isFalse();
}

Expand Down Expand Up @@ -891,7 +908,9 @@ void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {

// transition into next state, for which the job state is still INITIALIZING
runInMainThread(
() -> scheduler.transitionToState(new DummyState.Factory(JobStatus.INITIALIZING)));
() ->
scheduler.transitionToState(
new DummyState.Factory(scheduler, JobStatus.INITIALIZING)));

assertThat(numStatusUpdates).hasValue(0);
}
Expand Down Expand Up @@ -1021,13 +1040,14 @@ void testTransitionToStateCallsOnLeave() throws Exception {
EXECUTOR_RESOURCE.getExecutor())
.build();

final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
final LifecycleMethodCapturingState firstState =
new LifecycleMethodCapturingState(scheduler);

runInMainThread(() -> scheduler.transitionToState(new StateInstanceFactory(firstState)));

firstState.reset();

runInMainThread(() -> scheduler.transitionToState(new DummyState.Factory()));
runInMainThread(() -> scheduler.transitionToState(new DummyState.Factory(scheduler)));

assertThat(firstState.onLeaveCalled).isTrue();
assertThat(firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
Expand Down Expand Up @@ -2508,6 +2528,10 @@ private static class LifecycleMethodCapturingState extends DummyState {
boolean onLeaveCalled = false;
@Nullable Class<? extends State> onLeaveNewStateArgument = null;

public LifecycleMethodCapturingState(Context context) {
super(context);
}

void reset() {
onLeaveCalled = false;
onLeaveNewStateArgument = null;
Expand Down Expand Up @@ -2607,52 +2631,35 @@ public LifecycleMethodCapturingState getState() {
}
}

static class DummyState implements State {
static class DummyState extends StateWithoutExecutionGraph {

private final JobStatus jobStatus;

public DummyState() {
this(JobStatus.RUNNING);
public DummyState(StateWithoutExecutionGraph.Context context) {
this(context, JobStatus.RUNNING);
}

public DummyState(JobStatus jobStatus) {
public DummyState(StateWithoutExecutionGraph.Context context, JobStatus jobStatus) {
super(context, AdaptiveSchedulerTest.LOG);
this.jobStatus = jobStatus;
}

@Override
public void cancel() {}

@Override
public void suspend(Throwable cause) {}

@Override
public JobStatus getJobStatus() {
return jobStatus;
}

@Override
public ArchivedExecutionGraph getJob() {
return null;
}

@Override
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {}

@Override
public Logger getLogger() {
return null;
}

private static class Factory implements StateFactory<DummyState> {

private final StateWithoutExecutionGraph.Context context;
private final JobStatus jobStatus;

public Factory() {
this(JobStatus.RUNNING);
public Factory(StateWithoutExecutionGraph.Context context) {
this(context, JobStatus.RUNNING);
}

public Factory(JobStatus jobStatus) {
public Factory(StateWithoutExecutionGraph.Context context, JobStatus jobStatus) {
this.context = context;
this.jobStatus = jobStatus;
}

Expand All @@ -2663,7 +2670,7 @@ public Class<DummyState> getStateClass() {

@Override
public DummyState getState() {
return new DummyState(jobStatus);
return new DummyState(context, jobStatus);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,15 @@ void testInternalRunScheduledTasks_correctExecutionOrder() {
};

ctx.runIfState(
new AdaptiveSchedulerTest.DummyState(),
new AdaptiveSchedulerTest.DummyState(ctx),
runLastBecauseOfHighDelay,
Duration.ofMillis(999));
ctx.runIfState(
new AdaptiveSchedulerTest.DummyState(), runFirstBecauseOfLowDelay, Duration.ZERO);
new AdaptiveSchedulerTest.DummyState(ctx),
runFirstBecauseOfLowDelay,
Duration.ZERO);
ctx.runIfState(
new AdaptiveSchedulerTest.DummyState(),
new AdaptiveSchedulerTest.DummyState(ctx),
runSecondBecauseOfScheduleOrder,
Duration.ZERO);

Expand All @@ -244,7 +246,7 @@ void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
executed.set(true);
};

ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeOnce, Duration.ZERO);
ctx.runIfState(new AdaptiveSchedulerTest.DummyState(ctx), executeOnce, Duration.ZERO);

// execute at least twice
ctx.runScheduledTasks();
Expand All @@ -256,14 +258,15 @@ void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
void testInternalRunScheduledTasks_upperBoundRespected() {
Runnable executeNever = () -> fail("Not expected");

ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, Duration.ofMillis(10));
ctx.runIfState(
new AdaptiveSchedulerTest.DummyState(ctx), executeNever, Duration.ofMillis(10));

ctx.runScheduledTasks(4);
}

@Test
void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
final State state = new AdaptiveSchedulerTest.DummyState();
final State state = new AdaptiveSchedulerTest.DummyState(ctx);

AtomicBoolean executed = new AtomicBoolean(false);
ctx.runIfState(
Expand Down

0 comments on commit 83be264

Please sign in to comment.