Skip to content

Commit

Permalink
[FLINK-17019][runtime] Remap orphaned slot allocation to pending slot…
Browse files Browse the repository at this point in the history
… request which just lost its allocation
  • Loading branch information
zhuzhurk committed Jun 18, 2020
1 parent 01360fe commit df9a882
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,20 @@ private void requestSlotFromResourceManager(

final AllocationID allocationId = new AllocationID();

pendingRequest.setAllocationId(allocationId);
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);

pendingRequest.getAllocatedSlotFuture().whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {
// cancel the slot request if there is a failure or if the pending request has
// been completed with another allocated slot
resourceManagerGateway.cancelSlotRequest(allocationId);
if (throwable != null) {
// the allocation id can be remapped so we need to get it from the pendingRequest
// where it will be updated timely
final Optional<AllocationID> updatedAllocationId = pendingRequest.getAllocationId();

if (updatedAllocationId.isPresent()) {
// cancel the slot request if there is a failure
resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
}
}
});

Expand Down Expand Up @@ -544,15 +550,20 @@ private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
final PendingRequest pendingRequest = findMatchingPendingRequest(allocatedSlot);

if (pendingRequest != null) {
log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
log.debug("Fulfilling pending slot request [{}] with slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

// this allocation may become orphan once its corresponding request is removed
final AllocationID allocationIdOfRequest = pendingRequests.getKeyBByKeyA(pendingRequest.getSlotRequestId());

removePendingRequest(pendingRequest.getSlotRequestId());

allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);

maybeRemapOrphanedAllocation(allocationIdOfRequest, allocatedSlot.getAllocationId());
} else {
log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
log.debug("Adding slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}
Expand All @@ -578,6 +589,30 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
return null;
}

private void maybeRemapOrphanedAllocation(
final AllocationID allocationIdOfRequest,
final AllocationID allocationIdOfSlot) {

// allocation of a request is orphaned if the request is fulfilled by a different allocated slot.
// if the request of that allocated slot is still pending, it should take over the orphaned allocation.
// this enables the request to fail fast if the remapped allocation fails.
if (!allocationIdOfRequest.equals(allocationIdOfSlot)) {
final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyAByKeyB(allocationIdOfSlot);
if (requestIdOfAllocatedSlot != null) {
final PendingRequest requestOfAllocatedSlot = pendingRequests.getValueByKeyA(requestIdOfAllocatedSlot);
requestOfAllocatedSlot.setAllocationId(allocationIdOfRequest);

// this re-insertion of initiatedRequestId will not affect its original insertion order
pendingRequests.put(requestIdOfAllocatedSlot, allocationIdOfRequest, requestOfAllocatedSlot);
} else {
// request id of the allocated slot can be null if the slot is returned by scheduler.
// the orphaned allocation will not be adopted in this case, which means it is not needed
// anymore by any pending requests. we should cancel it to avoid allocating unnecessary slots.
resourceManagerGateway.cancelSlotRequest(allocationIdOfRequest);
}
}
}

@Override
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
Expand Down Expand Up @@ -1348,6 +1383,9 @@ protected static class PendingRequest {

private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;

@Nullable
private AllocationID allocationId;

private long unfillableSince;

private PendingRequest(
Expand Down Expand Up @@ -1419,6 +1457,14 @@ private boolean isFulfillable() {
long getUnfulfillableSince() {
return unfillableSince;
}

void setAllocationId(final AllocationID allocationId) {
this.allocationId = allocationId;
}

Optional<AllocationID> getAllocationId() {
return Optional.ofNullable(allocationId);
}
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.ManualClock;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -80,8 +80,10 @@
import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -738,6 +740,79 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
}
}

@Test
public void testOrphanedAllocationCanBeRemapped() throws Exception {
final List<AllocationID> allocationIds = new ArrayList<>();
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.add(slotRequest.getAllocationId()));

final List<AllocationID> canceledAllocations = new ArrayList<>();
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
requestNewAllocatedSlots(slotPool, slotRequestId1, slotRequestId2);

final AllocationID allocationId1 = allocationIds.get(0);
final AllocationID allocationId2 = allocationIds.get(1);

offerSlot(slotPool, allocationId2);

// verify that orphaned allocationId2 is remapped to slotRequestId2
assertThat(slotPool.getPendingRequests().values(), hasSize(1));
assertThat(slotPool.getPendingRequests().containsKeyA(slotRequestId2), is(true));
assertThat(slotPool.getPendingRequests().containsKeyB(allocationId1), is(true));
assertThat(canceledAllocations, hasSize(0));
}
}

@Test
public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception {
final List<AllocationID> allocationIds = new ArrayList<>();
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.add(slotRequest.getAllocationId()));

final List<AllocationID> canceledAllocations = new ArrayList<>();
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
requestNewAllocatedSlots(slotPool, slotRequestId1, slotRequestId2);

final AllocationID allocationId1 = allocationIds.get(0);
final AllocationID allocationId2 = allocationIds.get(1);

// create a random non-existing allocation id
AllocationID randomAllocationId;
do {
randomAllocationId = new AllocationID();
} while (randomAllocationId.equals(allocationId1) || randomAllocationId.equals(allocationId2));

offerSlot(slotPool, randomAllocationId);

assertThat(slotPool.getPendingRequests().values(), hasSize(1));
assertThat(canceledAllocations, contains(allocationId1));
}
}

private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
for (SlotRequestId slotRequestId : slotRequestIds) {
requestNewAllocatedSlot(slotPool, slotRequestId);
}
}

private void requestNewAllocatedSlot(final SlotPool slotPool, final SlotRequestId slotRequestId) {
slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}

private void offerSlot(final SlotPoolImpl slotPool, final AllocationID allocationId) {
final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer);
}

private List<AllocationID> registerAndOfferSlots(TaskManagerLocation taskManagerLocation, SlotPoolImpl slotPool, int numberOfSlotsToRegister) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final List<AllocationID> allocationIds = IntStream.range(0, numberOfSlotsToRegister)
Expand Down Expand Up @@ -800,6 +875,12 @@ private CompletableFuture<LogicalSlot> allocateSlot(Scheduler scheduler, SlotReq
timeout);
}

private SlotPoolImpl createAndSetUpSlotPool() throws Exception {
final SlotPoolImpl slotPool = createSlotPoolImpl();
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
return slotPool;
}

private static void setupSlotPool(
SlotPoolImpl slotPool,
ResourceManagerGateway resourceManagerGateway,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
Expand All @@ -37,6 +41,8 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
}
}

@Test
public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
final List<AllocationID> allocations = new ArrayList<>();
resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));

try (SlotPoolImpl slotPool = setUpSlotPool()) {
final CompletableFuture<PhysicalSlot> slotFuture1 = requestNewAllocatedSlot(slotPool, new SlotRequestId());
final CompletableFuture<PhysicalSlot> slotFuture2 = requestNewAllocatedSlot(slotPool, new SlotRequestId());

final AllocationID allocationId1 = allocations.get(0);
final AllocationID allocationId2 = allocations.get(1);

final TaskManagerLocation location = new LocalTaskManagerLocation();
final SlotOffer slotOffer = new SlotOffer(allocationId2, 0, ResourceProfile.ANY);
slotPool.registerTaskManager(location.getResourceID());
slotPool.offerSlot(location, new SimpleAckingTaskManagerGateway(), slotOffer);

assertThat(slotFuture1.isDone(), is(true));
assertThat(slotFuture2.isDone(), is(false));

final FlinkException cause = new FlinkException("Fail pending slot request failure.");
final Optional<ResourceID> responseFuture = slotPool.failAllocation(allocationId1, cause);

assertThat(responseFuture.isPresent(), is(false));

try {
slotFuture2.getNow(null);
fail("Expected a slot allocation failure.");
} catch (Throwable t) {
assertThat(ExceptionUtils.stripCompletionException(t), equalTo(cause));
}
}
}

/**
* Tests that a failing resource manager request fails a pending slot request and cancels the slot
* request at the RM (e.g. due to a TimeoutException).
Expand Down

0 comments on commit df9a882

Please sign in to comment.