Skip to content

Commit

Permalink
[FLINK-28640][runtime] Let BlocklistDeclarativeSlotPool accept duplic…
Browse files Browse the repository at this point in the history
…ate slot offers

The BlocklistDeclarativeSlotPool should accept a duplicate (already accepted) slot, even if it's from a currently blocked task manager. Because the slot may already be assigned to an execution, rejecting it will cause a task failover.

This closes apache#20341.
  • Loading branch information
wanglijie95 authored and zhuzhurk committed Jul 22, 2022
1 parent 7b05a1b commit f6a22ea
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Consumer;

Expand Down Expand Up @@ -75,9 +75,7 @@ public Collection<SlotOffer> offerSlots(
if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) {
return super.offerSlots(offers, taskManagerLocation, taskManagerGateway, currentTime);
} else {
LOG.debug(
"Reject slots {} from a blocked TaskManager {}.", offers, taskManagerLocation);
return Collections.emptySet();
return internalOfferSlotsFromBlockedTaskManager(offers, taskManagerLocation);
}
}

Expand All @@ -90,11 +88,39 @@ public Collection<SlotOffer> registerSlots(
if (!isBlockedTaskManager(taskManagerLocation.getResourceID())) {
return super.registerSlots(slots, taskManagerLocation, taskManagerGateway, currentTime);
} else {
LOG.debug("Reject slots {} from a blocked TaskManager {}.", slots, taskManagerLocation);
return Collections.emptySet();
return internalOfferSlotsFromBlockedTaskManager(slots, taskManagerLocation);
}
}

private Collection<SlotOffer> internalOfferSlotsFromBlockedTaskManager(
Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation) {
final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>();
final Collection<SlotOffer> rejectedSlotOffers = new ArrayList<>();

// we should accept a duplicate (already accepted) slot, even if it's from a currently
// blocked task manager. Because the slot may already be assigned to an execution, rejecting
// it will cause a task failover.
for (SlotOffer offer : offers) {
if (slotPool.containsSlot(offer.getAllocationId())) {
// we have already accepted this offer
acceptedSlotOffers.add(offer);
} else {
rejectedSlotOffers.add(offer);
}
}

LOG.debug(
"Received {} slots from a blocked TaskManager {}, {} was accepted before: {}, {} was rejected: {}.",
offers.size(),
taskManagerLocation,
acceptedSlotOffers.size(),
acceptedSlotOffers,
rejectedSlotOffers.size(),
rejectedSlotOffers);

return acceptedSlotOffers;
}

@Override
public ResourceCounter freeReservedSlot(
AllocationID allocationId, @Nullable Throwable cause, long currentTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -103,6 +107,37 @@ private void testOfferSlots(boolean isBlocked) throws Exception {
}
}

@Test
void testOfferDuplicateSlots() {
final TaskManagerLocation taskManager = new LocalTaskManagerLocation();
final List<ResourceID> blockedTaskManagers = new ArrayList<>();

final BlocklistDeclarativeSlotPool slotPool =
BlocklistDeclarativeSlotPoolBuilder.builder()
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
.build();

final ResourceCounter resourceRequirements =
ResourceCounter.withResource(RESOURCE_PROFILE, 2);
slotPool.increaseResourceRequirementsBy(resourceRequirements);

SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);

// offer and accept slot1
assertThat(
SlotPoolTestUtils.offerSlots(
slotPool, Collections.singleton(slot1), taskManager))
.containsExactly(slot1);

// block the task manager.
blockedTaskManagers.add(taskManager.getResourceID());

// offer slot1 and slot2, accept slot1, reject slot2
assertThat(SlotPoolTestUtils.offerSlots(slotPool, Arrays.asList(slot1, slot2), taskManager))
.containsExactly(slot1);
}

@Test
void testRegisterSlotsFromBlockedTaskManager() {
testRegisterSlots(true);
Expand Down Expand Up @@ -152,6 +187,41 @@ private void testRegisterSlots(boolean isBlocked) {
}
}

@Test
void testRegisterDuplicateSlots() {
final TaskManagerLocation taskManager = new LocalTaskManagerLocation();
final List<ResourceID> blockedTaskManagers = new ArrayList<>();

final BlocklistDeclarativeSlotPool slotPool =
BlocklistDeclarativeSlotPoolBuilder.builder()
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
.build();

SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);

// register and accept slot1
Collection<SlotOffer> acceptedOffers =
slotPool.registerSlots(
Collections.singleton(slot1),
taskManager,
SlotPoolTestUtils.createTaskManagerGateway(null),
0);
assertThat(acceptedOffers).containsExactly(slot1);

// block the task manager
blockedTaskManagers.add(taskManager.getResourceID());

// register slot1 and slot2, accept slot1, reject slot2
acceptedOffers =
slotPool.registerSlots(
Arrays.asList(slot1, slot2),
taskManager,
SlotPoolTestUtils.createTaskManagerGateway(null),
0);
assertThat(acceptedOffers).containsExactly(slot1);
}

@Test
void testFreeReservedSlotsOnBlockedTaskManager() throws Exception {
testFreeReservedSlots(true);
Expand Down

0 comments on commit f6a22ea

Please sign in to comment.