Skip to content

Commit

Permalink
[FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryM…
Browse files Browse the repository at this point in the history
…arkSlotActive

In order to avoid timing out activated slots, we also need to deactivate the slot timeout
in case that TaskSlotTable.tryMarkSlotActive is being called. This can happen if the response
for JobMasterGateway.offerSlots has been too late and timed out.

This closes apache#12388.
  • Loading branch information
tillrohrmann committed Jun 2, 2020
1 parent 3181248 commit 3dabc69
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,18 +320,22 @@ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundExce
TaskSlot<T> taskSlot = getTaskSlot(allocationId);

if (taskSlot != null) {
if (taskSlot.markActive()) {
// unregister a potential timeout
LOG.info("Activate slot {}.", allocationId);
return markExistingSlotActive(taskSlot);
} else {
throw new SlotNotFoundException(allocationId);
}
}

timerService.unregisterTimeout(allocationId);
private boolean markExistingSlotActive(TaskSlot<T> taskSlot) {
if (taskSlot.markActive()) {
// unregister a potential timeout
LOG.info("Activate slot {}.", taskSlot.getAllocationId());

return true;
} else {
return false;
}
timerService.unregisterTimeout(taskSlot.getAllocationId());

return true;
} else {
throw new SlotNotFoundException(allocationId);
return false;
}
}

Expand Down Expand Up @@ -428,7 +432,7 @@ public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
TaskSlot<T> taskSlot = getTaskSlot(allocationId);

if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
return taskSlot.markActive();
return markExistingSlotActive(taskSlot);
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriFunctionWithException;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

Expand All @@ -41,11 +42,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Tests for the {@link TaskSlotTable}.
Expand Down Expand Up @@ -291,6 +295,36 @@ public void testAllocatedSlotTimeout() throws Exception {
}
}

@Test
public void testMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
runDeactivateSlotTimeoutTest((taskSlotTable, jobId, allocationId) -> taskSlotTable.markSlotActive(allocationId));
}

@Test
public void testTryMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
runDeactivateSlotTimeoutTest(TaskSlotTable::tryMarkSlotActive);
}

private void runDeactivateSlotTimeoutTest(TriFunctionWithException<TaskSlotTable<TaskSlotPayload>, JobID, AllocationID, Boolean, SlotNotFoundException> taskSlotTableAction) throws Exception {
final CompletableFuture<AllocationID> timeoutFuture = new CompletableFuture<>();
final TestingSlotActions testingSlotActions = new TestingSlotActionsBuilder()
.setTimeoutSlotConsumer((allocationID, uuid) -> timeoutFuture.complete(allocationID))
.build();

try (final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(1, testingSlotActions)) {
final AllocationID allocationId = new AllocationID();
final long timeout = 50L;
final JobID jobId = new JobID();
assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, Time.milliseconds(timeout)), is(true));
assertThat(taskSlotTableAction.apply(taskSlotTable, jobId, allocationId), is(true));

try {
timeoutFuture.get(timeout, TimeUnit.MILLISECONDS);
fail("The slot timeout should have been deactivated.");
} catch (TimeoutException expected) {}
}
}

private static TaskSlotTable<TaskSlotPayload> createTaskSlotTableWithStartedTask(
final TaskSlotPayload task) throws SlotNotFoundException, SlotNotActiveException {
return createTaskSlotTableWithStartedTask(task, new TestingSlotActionsBuilder().build());
Expand Down

0 comments on commit 3dabc69

Please sign in to comment.