Skip to content

Commit

Permalink
[FLINK-17017][runtime] Allow to disable batch slot request timeout check
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk committed Jun 11, 2020
1 parent f6c275e commit 541f28f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile);

/**
* Disables batch slot request timeout check. Invoked when someone else wants to
* take over the timeout check responsibility.
*/
default void disableBatchSlotRequestTimeoutCheck() {
throw new UnsupportedOperationException("Not properly implemented.");
}

/**
* Create report about the allocated slots belonging to the specified task manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public class SlotPoolImpl implements SlotPool {

private ComponentMainThreadExecutor componentMainThreadExecutor;

private boolean batchSlotRequestTimeoutCheckEnabled;

// ------------------------------------------------------------------------

public SlotPoolImpl(
Expand All @@ -160,6 +162,8 @@ public SlotPoolImpl(
this.jobManagerAddress = null;

this.componentMainThreadExecutor = null;

this.batchSlotRequestTimeoutCheckEnabled = true;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -453,6 +457,11 @@ public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
.thenApply(Function.identity());
}

@Override
public void disableBatchSlotRequestTimeoutCheck() {
batchSlotRequestTimeoutCheckEnabled = false;
}

@Override
@Nonnull
public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
Expand Down Expand Up @@ -874,6 +883,10 @@ protected void checkIdleSlot() {
}

protected void checkBatchSlotTimeout() {
if (!batchSlotRequestTimeoutCheckEnabled) {
return;
}

final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();

if (!pendingBatchRequests.isEmpty()) {
Expand Down

0 comments on commit 541f28f

Please sign in to comment.