Skip to content

Commit

Permalink
[FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batc…
Browse files Browse the repository at this point in the history
…h slot request timeout check
  • Loading branch information
zhuzhurk committed Jul 13, 2022
1 parent c75a242 commit 265612c
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,9 @@ public void notifyNotEnoughResourcesAvailable(
}

private void failPendingRequests(Collection<ResourceRequirement> acquiredResources) {
Predicate<PendingRequest> predicate =
request -> !isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest();
// only fails streaming requests because batch jobs do not require all resources
// requirements to be fullfilled at the same time
Predicate<PendingRequest> predicate = request -> !request.isBatchRequest();
if (pendingRequests.values().stream().anyMatch(predicate)) {
log.warn(
"Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
* @param cause of the cancellation
*/
void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);

/**
* Disables batch slot request timeout check. Invoked when someone else wants to take over the
* timeout check responsibility.
*/
void disableBatchSlotRequestTimeoutCheck();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public PhysicalSlotProviderImpl(
SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
this.slotPool = checkNotNull(slotPool);
}

@Override
public void disableBatchSlotRequestTimeoutCheck() {
slotPool.disableBatchSlotRequestTimeoutCheck();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
this.allocationTimeout = checkNotNull(allocationTimeout);
this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
this.sharedSlots = new IdentityHashMap<>();

this.slotProvider.disableBatchSlotRequestTimeoutCheck();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ public void testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewS
.buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));

new PhysicalSlotProviderImpl(
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
final PhysicalSlotProvider slotProvider =
new PhysicalSlotProviderImpl(
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
slotProvider.disableBatchSlotRequestTimeoutCheck();
assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ public void testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewS
.buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));

new PhysicalSlotProviderImpl(
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
final PhysicalSlotProvider slotProvider =
new PhysicalSlotProviderImpl(
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
slotProvider.disableBatchSlotRequestTimeoutCheck();
assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -134,33 +133,6 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
}
}

/**
* Tests that a batch slot request does react to {@link
* SlotPoolService#notifyNotEnoughResourcesAvailable}.
*/
@Test
public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
final TestingResourceManagerGateway testingResourceManagerGateway =
new TestingResourceManagerGateway();

try (final DeclarativeSlotPoolBridge slotPool =
new DeclarativeSlotPoolBridgeBuilder()
.setResourceManagerGateway(testingResourceManagerGateway)
.buildAndStart(mainThreadExecutor)) {

final CompletableFuture<PhysicalSlot> slotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool, mainThreadExecutor, resourceProfile);

SlotPoolUtils.notifyNotEnoughResourcesAvailable(
slotPool, mainThreadExecutor, Collections.emptyList());

assertThat(
slotFuture,
FlinkMatchers.futureWillCompleteExceptionally(Duration.ofSeconds(10L)));
}
}

/**
* Tests that a batch slot request won't fail if its resource manager request fails with
* exceptions other than {@link UnfulfillableSlotRequestException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ void testFailLogicalSlotIfPhysicalSlotIsFails() {
.isEqualTo(context.getSlotProvider().getRequests().keySet());
}

@Test
void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() {
final AllocationContext context = new AllocationContext();
assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
}

private static class AllocationContext {
private final TestingPhysicalSlotProvider slotProvider;
private final boolean slotWillBeOccupiedIndefinitely;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ void testSlotRequestProfileFromExecutionSlotSharingGroup() {
.containsExactlyInAnyOrder(resourceProfile1, resourceProfile2);
}

@Test
void testSlotProviderBatchSlotRequestTimeoutCheckIsDisabled() {
final AllocationContext context = AllocationContext.newBuilder().build();
assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isFalse();
}

private static List<ExecutionVertexID> getAssignIds(
Collection<ExecutionSlotAssignment> assignments) {
return assignments.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
private final Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>>
physicalSlotCreator;

private boolean batchSlotRequestTimeoutCheckEnabled = true;

public static TestingPhysicalSlotProvider create(
Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
return new TestingPhysicalSlotProvider(physicalSlotCreator);
Expand Down Expand Up @@ -126,6 +128,11 @@ public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
cancellations.put(slotRequestId, cause);
}

@Override
public void disableBatchSlotRequestTimeoutCheck() {
batchSlotRequestTimeoutCheckEnabled = false;
}

public CompletableFuture<TestingPhysicalSlot> getResultForRequestId(
SlotRequestId slotRequestId) {
return getResponses().get(slotRequestId);
Expand Down Expand Up @@ -160,4 +167,8 @@ private static <T> T getFirstElementOrFail(Collection<T> collection) {
Preconditions.checkState(element.isPresent());
return element.get();
}

boolean isBatchSlotRequestTimeoutCheckEnabled() {
return batchSlotRequestTimeoutCheckEnabled;
}
}

0 comments on commit 265612c

Please sign in to comment.