From 56aefcd9e7055ae6e2d10110715024ed6997b50c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 10 Nov 2017 09:25:02 +0100 Subject: [PATCH] [FLINK-6434] [flip6] Introduce SlotRequestID for SlotPool#allocateSlot We have to distinguish between the slot request and the slot allocation because slot allocations are reused across multiple slot requests. Therefore we cannot use the AllocationID to identify individual slot requests which might be cancelled. Introducing a separate SlotRequestID solves the problem. --- .../flink/runtime/instance/DualKeyMap.java | 162 +++++++++++++++++ .../flink/runtime/instance/SlotPool.java | 164 +++++++++--------- .../runtime/instance/SlotPoolGateway.java | 17 +- .../runtime/instance/AllocatedSlotsTest.java | 10 +- .../runtime/instance/SlotPoolRpcTest.java | 71 ++++++-- .../flink/runtime/instance/SlotPoolTest.java | 22 ++- 6 files changed, 329 insertions(+), 117 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/instance/DualKeyMap.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DualKeyMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DualKeyMap.java new file mode 100644 index 0000000000000..741d137206918 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DualKeyMap.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; + +/** + * Map which stores values under two different indices. + * + * @param Type of key A + * @param Type of key B + * @param Type of the value + */ +public class DualKeyMap { + + private final HashMap> aMap; + + private final HashMap bMap; + + private transient Collection values; + + public DualKeyMap(int initialCapacity) { + this.aMap = new HashMap<>(initialCapacity); + this.bMap = new HashMap<>(initialCapacity); + } + + public int size() { + return aMap.size(); + } + + public V getKeyA(A aKey) { + final Tuple2 value = aMap.get(aKey); + + if (value != null) { + return value.f1; + } else { + return null; + } + } + + public V getKeyB(B bKey) { + final A aKey = bMap.get(bKey); + + if (aKey != null) { + return aMap.get(aKey).f1; + } else { + return null; + } + } + + public V put(A aKey, B bKey, V value) { + Tuple2 aValue = aMap.put(aKey, Tuple2.of(bKey, value)); + bMap.put(bKey, aKey); + + if (aValue != null) { + return aValue.f1; + } else { + return null; + } + } + + public boolean containsKeyA(A aKey) { + return aMap.containsKey(aKey); + } + + public boolean containsKeyB(B bKey) { + return bMap.containsKey(bKey); + } + + public V removeKeyA(A aKey) { + Tuple2 aValue = aMap.remove(aKey); + + if (aValue != null) { + bMap.remove(aValue.f0); + return aValue.f1; + } else { + return null; + } + } + + public V removeKeyB(B bKey) { + A aKey = bMap.remove(bKey); + + if (aKey != null) { + Tuple2 aValue = aMap.remove(aKey); + if (aValue != null) { + return aValue.f1; + } else { + return null; + } + } else { + return null; + } + } + + public Collection values() { + Collection vs = values; + + if (vs == null) { + vs = new Values(); + values = vs; + } + + return vs; + } + + public void clear() { + aMap.clear(); + bMap.clear(); + } + + private final class Values extends AbstractCollection { + + @Override + public Iterator iterator() { + return new ValueIterator(); + } + + @Override + public int size() { + return aMap.size(); + } + } + + private final class ValueIterator implements Iterator { + + private final Iterator> iterator = aMap.values().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public V next() { + Tuple2 value = iterator.next(); + + return value.f1; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 159df7be74627..7e98e11b7feec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -109,10 +109,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { private final AvailableSlots availableSlots; /** All pending requests waiting for slots */ - private final HashMap pendingRequests; + private final DualKeyMap pendingRequests; /** The requests that are waiting for the resource manager to be connected */ - private final HashMap waitingForResourceManager; + private final HashMap waitingForResourceManager; /** Timeout for request calls to the ResourceManager */ private final Time resourceManagerRequestsTimeout; @@ -155,8 +155,8 @@ public SlotPool( this.registeredTaskManagers = new HashSet<>(); this.allocatedSlots = new AllocatedSlots(); this.availableSlots = new AvailableSlots(); - this.pendingRequests = new HashMap<>(); - this.waitingForResourceManager = new HashMap<>(); + this.pendingRequests = new DualKeyMap<>(16); + this.waitingForResourceManager = new HashMap<>(16); this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout); @@ -248,8 +248,8 @@ public void connectToResourceManager(ResourceManagerGateway resourceManagerGatew this.resourceManagerGateway = checkNotNull(resourceManagerGateway); // work on all slots waiting for this connection - for (PendingRequest pending : waitingForResourceManager.values()) { - requestSlotFromResourceManager(pending.allocationID(), pending.getFuture(), pending.resourceProfile()); + for (PendingRequest pendingRequest : waitingForResourceManager.values()) { + requestSlotFromResourceManager(pendingRequest); } // all sent off @@ -267,13 +267,13 @@ public void disconnectResourceManager() { @Override public CompletableFuture allocateSlot( - AllocationID allocationID, + SlotRequestID requestId, ScheduledUnit task, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(allocationID, task, resources, locationPreferences); + return internalAllocateSlot(requestId, task, resources, locationPreferences); } @Override @@ -282,21 +282,21 @@ public void returnAllocatedSlot(Slot slot) { } @Override - public CompletableFuture cancelSlotAllocation(AllocationID allocationId) { - final PendingRequest pendingRequest = removePendingRequest(allocationId); + public CompletableFuture cancelSlotAllocation(SlotRequestID requestId) { + final PendingRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { - failPendingRequest(pendingRequest, new CancellationException("Allocation " + allocationId + " cancelled.")); + failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled.")); } else { - final Slot slot = allocatedSlots.get(allocationId); + final Slot slot = allocatedSlots.get(requestId); if (slot != null) { - LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, allocationId); + LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId); if (slot.markCancelled()) { internalReturnAllocatedSlot(slot); } } else { - LOG.debug("There was no slot allocation with {} to be cancelled.", allocationId); + LOG.debug("There was no slot allocation with {} to be cancelled.", requestId); } } @@ -304,7 +304,7 @@ public CompletableFuture cancelSlotAllocation(AllocationID allocati } CompletableFuture internalAllocateSlot( - AllocationID allocationID, + SlotRequestID requestId, ScheduledUnit task, ResourceProfile resources, Iterable locationPreferences) { @@ -313,7 +313,7 @@ CompletableFuture internalAllocateSlot( SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences); if (slotFromPool != null) { SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); - allocatedSlots.add(slot); + allocatedSlots.add(requestId, slot); return CompletableFuture.completedFuture(slot); } @@ -321,13 +321,12 @@ CompletableFuture internalAllocateSlot( final CompletableFuture future = new CompletableFuture<>(); // (2) need to request a slot - if (resourceManagerGateway == null) { // no slot available, and no resource manager connection - stashRequestWaitingForResourceManager(allocationID, resources, future); + stashRequestWaitingForResourceManager(requestId, resources, future); } else { // we have a resource manager connection, so let's ask it for more resources - requestSlotFromResourceManager(allocationID, future, resources); + requestSlotFromResourceManager(new PendingRequest(requestId, future, resources)); } return future; @@ -337,48 +336,48 @@ CompletableFuture internalAllocateSlot( * Checks whether there exists a pending request with the given allocation id and removes it * from the internal data structures. * - * @param allocationId identifying the pending request + * @param requestId identifying the pending request * @return pending request if there is one, otherwise null */ @Nullable - private PendingRequest removePendingRequest(AllocationID allocationId) { - PendingRequest result = waitingForResourceManager.remove(allocationId); + private PendingRequest removePendingRequest(SlotRequestID requestId) { + PendingRequest result = waitingForResourceManager.remove(requestId); if (result != null) { // sanity check - assert !pendingRequests.containsKey(allocationId) : "A pending requests should only be part of either " + + assert !pendingRequests.containsKeyA(requestId) : "A pending requests should only be part of either " + "the pendingRequests or waitingForResourceManager but not both."; return result; } else { - return pendingRequests.remove(allocationId); + return pendingRequests.removeKeyA(requestId); } } private void requestSlotFromResourceManager( - final AllocationID allocationID, - final CompletableFuture future, - final ResourceProfile resources) { + final PendingRequest pendingRequest) { + + LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId()); - LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID); + final AllocationID allocationId = new AllocationID(); - pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources)); + pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); - future.whenComplete( + pendingRequest.getFuture().whenComplete( (value, throwable) -> { if (throwable != null) { - resourceManagerGateway.cancelSlotRequest(allocationID); + resourceManagerGateway.cancelSlotRequest(allocationId); } }); CompletableFuture rmResponse = resourceManagerGateway.requestSlot( jobMasterId, - new SlotRequest(jobId, allocationID, resources, jobManagerAddress), + new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), resourceManagerRequestsTimeout); CompletableFuture slotRequestProcessingFuture = rmResponse.thenAcceptAsync( (Acknowledge value) -> { - slotRequestToResourceManagerSuccess(allocationID); + slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId()); }, getMainThreadExecutor()); @@ -386,39 +385,39 @@ private void requestSlotFromResourceManager( slotRequestProcessingFuture.whenCompleteAsync( (Void v, Throwable failure) -> { if (failure != null) { - slotRequestToResourceManagerFailed(allocationID, failure); + slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } }, getMainThreadExecutor()); } - private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) { + private void slotRequestToResourceManagerSuccess(final SlotRequestID requestId) { // a request is pending from the ResourceManager to a (future) TaskManager // we only add the watcher here in case that request times out scheduleRunAsync(new Runnable() { @Override public void run() { - checkTimeoutSlotAllocation(allocationID); + checkTimeoutSlotAllocation(requestId); } }, resourceManagerAllocationTimeout); } - private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) { - PendingRequest request = pendingRequests.remove(allocationID); + private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) { + PendingRequest request = pendingRequests.removeKeyA(slotRequestID); if (request != null) { request.getFuture().completeExceptionally(new NoResourceAvailableException( "No pooled slot available and request to ResourceManager for new slot failed", failure)); } else { if (LOG.isDebugEnabled()) { - LOG.debug("Unregistered slot request {} failed.", allocationID, failure); + LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure); } } } - private void checkTimeoutSlotAllocation(AllocationID allocationID) { - PendingRequest request = pendingRequests.remove(allocationID); + private void checkTimeoutSlotAllocation(SlotRequestID slotRequestID) { + PendingRequest request = pendingRequests.removeKeyA(slotRequestID); if (request != null) { - failPendingRequest(request, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + failPendingRequest(request, new TimeoutException("Slot allocation request " + slotRequestID + " timed out")); } } @@ -432,25 +431,25 @@ private void failPendingRequest(PendingRequest pendingRequest, Exception e) { } private void stashRequestWaitingForResourceManager( - final AllocationID allocationID, + final SlotRequestID requestId, final ResourceProfile resources, final CompletableFuture future) { LOG.info("Cannot serve slot request, no ResourceManager connected. " + - "Adding as pending request {}", allocationID); + "Adding as pending request {}", requestId); - waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources)); + waitingForResourceManager.put(requestId, new PendingRequest(requestId, future, resources)); scheduleRunAsync(new Runnable() { @Override public void run() { - checkTimeoutRequestWaitingForResourceManager(allocationID); + checkTimeoutRequestWaitingForResourceManager(requestId); } }, resourceManagerRequestsTimeout); } - private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { - PendingRequest request = waitingForResourceManager.remove(allocationID); + private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequestId) { + PendingRequest request = waitingForResourceManager.remove(slotRequestId); if (request != null) { failPendingRequest( request, @@ -484,10 +483,10 @@ private void internalReturnAllocatedSlot(Slot slot) { if (pendingRequest != null) { LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", - pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId()); + pendingRequest.getSlotRequestId(), taskManagerSlot.getSlotAllocationId()); SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN); - allocatedSlots.add(newSlot); + allocatedSlots.add(pendingRequest.getSlotRequestId(), newSlot); pendingRequest.getFuture().complete(newSlot); } else { @@ -506,16 +505,16 @@ private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) { // try the requests sent to the resource manager first for (PendingRequest request : pendingRequests.values()) { - if (slotResources.isMatching(request.resourceProfile())) { - pendingRequests.remove(request.allocationID()); + if (slotResources.isMatching(request.getResourceProfile())) { + pendingRequests.removeKeyA(request.getSlotRequestId()); return request; } } // try the requests waiting for a resource manager connection next for (PendingRequest request : waitingForResourceManager.values()) { - if (slotResources.isMatching(request.resourceProfile())) { - waitingForResourceManager.remove(request.allocationID()); + if (slotResources.isMatching(request.getResourceProfile())) { + waitingForResourceManager.remove(request.getSlotRequestId()); return request; } } @@ -592,12 +591,12 @@ public CompletableFuture offerSlot(final AllocatedSlot slot) { } // check whether we have request waiting for this slot - PendingRequest pendingRequest = pendingRequests.remove(allocationID); + PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // we were waiting for this! SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN); pendingRequest.getFuture().complete(resultSlot); - allocatedSlots.add(resultSlot); + allocatedSlots.add(pendingRequest.getSlotRequestId(), resultSlot); } else { // we were actually not waiting for this: @@ -631,11 +630,10 @@ public CompletableFuture offerSlot(final AllocatedSlot slot) { */ @Override public void failAllocation(final AllocationID allocationID, final Exception cause) { - final PendingRequest pendingRequest = pendingRequests.remove(allocationID); + final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // request was still pending - LOG.debug("Failed pending request [{}] with ", allocationID, cause); - pendingRequest.getFuture().completeExceptionally(cause); + failPendingRequest(pendingRequest, cause); } else if (availableSlots.tryRemove(allocationID)) { LOG.debug("Failed available slot [{}] with ", allocationID, cause); @@ -719,12 +717,12 @@ AvailableSlots getAvailableSlots() { } @VisibleForTesting - public HashMap getPendingRequests() { + DualKeyMap getPendingRequests() { return pendingRequests; } @VisibleForTesting - public HashMap getWaitingForResourceManager() { + Map getWaitingForResourceManager() { return waitingForResourceManager; } @@ -741,11 +739,11 @@ static class AllocatedSlots { private final Map> allocatedSlotsByTaskManager; /** All allocated slots organized by AllocationID */ - private final Map allocatedSlotsById; + private final DualKeyMap allocatedSlotsById; AllocatedSlots() { - this.allocatedSlotsByTaskManager = new HashMap<>(); - this.allocatedSlotsById = new HashMap<>(); + this.allocatedSlotsByTaskManager = new HashMap<>(16); + this.allocatedSlotsById = new DualKeyMap<>(16); } /** @@ -753,8 +751,8 @@ static class AllocatedSlots { * * @param slot The allocated slot */ - void add(Slot slot) { - allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot); + void add(SlotRequestID slotRequestId, Slot slot) { + allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slotRequestId, slot); final ResourceID resourceID = slot.getTaskManagerID(); Set slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID); @@ -772,7 +770,11 @@ void add(Slot slot) { * @return The allocated slot, null if we can't find a match */ Slot get(final AllocationID allocationID) { - return allocatedSlotsById.get(allocationID); + return allocatedSlotsById.getKeyA(allocationID); + } + + Slot get(final SlotRequestID slotRequestId) { + return allocatedSlotsById.getKeyB(slotRequestId); } /** @@ -782,7 +784,7 @@ Slot get(final AllocationID allocationID) { * @return True if we contains this slot */ boolean contains(AllocationID slotAllocationId) { - return allocatedSlotsById.containsKey(slotAllocationId); + return allocatedSlotsById.containsKeyA(slotAllocationId); } /** @@ -800,7 +802,7 @@ boolean remove(final Slot slot) { * @param slotId The ID of the slot to be removed */ Slot remove(final AllocationID slotId) { - Slot slot = allocatedSlotsById.remove(slotId); + Slot slot = allocatedSlotsById.removeKeyA(slotId); if (slot != null) { final ResourceID taskManagerId = slot.getTaskManagerID(); Set slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); @@ -825,7 +827,7 @@ Set removeSlotsForTaskManager(final ResourceID resourceID) { Set slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID); if (slotsForTaskManager != null) { for (Slot slot : slotsForTaskManager) { - allocatedSlotsById.remove(slot.getAllocatedSlot().getSlotAllocationId()); + allocatedSlotsById.removeKeyA(slot.getAllocatedSlot().getSlotAllocationId()); } return slotsForTaskManager; } @@ -1090,12 +1092,12 @@ public CompletableFuture allocateSlot( boolean allowQueued, Collection preferredLocations) { - final AllocationID allocationID = new AllocationID(); - CompletableFuture slotFuture = gateway.allocateSlot(allocationID, task, ResourceProfile.UNKNOWN, preferredLocations, timeout); + final SlotRequestID requestId = new SlotRequestID(); + CompletableFuture slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout); slotFuture.whenComplete( (SimpleSlot slot, Throwable failure) -> { if (failure != null) { - gateway.cancelSlotAllocation(allocationID); + gateway.cancelSlotAllocation(requestId); } }); return slotFuture; @@ -1109,30 +1111,30 @@ public CompletableFuture allocateSlot( */ private static class PendingRequest { - private final AllocationID allocationID; + private final SlotRequestID slotRequestId; private final CompletableFuture future; private final ResourceProfile resourceProfile; PendingRequest( - AllocationID allocationID, + SlotRequestID slotRequestId, CompletableFuture future, ResourceProfile resourceProfile) { - this.allocationID = allocationID; - this.future = future; - this.resourceProfile = resourceProfile; + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + this.future = Preconditions.checkNotNull(future); + this.resourceProfile = Preconditions.checkNotNull(resourceProfile); } - public AllocationID allocationID() { - return allocationID; + public SlotRequestID getSlotRequestId() { + return slotRequestId; } public CompletableFuture getFuture() { return future; } - public ResourceProfile resourceProfile() { + public ResourceProfile getResourceProfile() { return resourceProfile; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 184072a1d6921..bf520f55a65ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -86,7 +87,7 @@ public interface SlotPoolGateway extends RpcGateway { // ------------------------------------------------------------------------ CompletableFuture allocateSlot( - AllocationID allocationID, + SlotRequestID requestId, ScheduledUnit task, ResourceProfile resources, Iterable locationPreferences, @@ -95,11 +96,17 @@ CompletableFuture allocateSlot( void returnAllocatedSlot(Slot slot); /** - * Cancel a slot allocation. This method should be called when the CompletableFuture returned by - * allocateSlot completed exceptionally. + * Cancel a slot allocation request. * - * @param allocationId identifying the slot allocation request + * @param requestId identifying the slot allocation request * @return Future acknowledge if the slot allocation has been cancelled */ - CompletableFuture cancelSlotAllocation(AllocationID allocationId); + CompletableFuture cancelSlotAllocation(SlotRequestID requestId); + + /** + * Request ID identifying different slot requests. + */ + final class SlotRequestID extends AbstractID { + private static final long serialVersionUID = -6072105912250154283L; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java index e654a995683ec..0e4bfc06830fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; + import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -37,10 +38,11 @@ public void testOperations() throws Exception { SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots(); final AllocationID allocation1 = new AllocationID(); + final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID(); final ResourceID resource1 = new ResourceID("resource1"); final Slot slot1 = createSlot(resource1, allocation1); - allocatedSlots.add(slot1); + allocatedSlots.add(slotRequestID, slot1); assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); assertTrue(allocatedSlots.containResource(resource1)); @@ -50,9 +52,10 @@ public void testOperations() throws Exception { assertEquals(1, allocatedSlots.size()); final AllocationID allocation2 = new AllocationID(); + final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID(); final Slot slot2 = createSlot(resource1, allocation2); - allocatedSlots.add(slot2); + allocatedSlots.add(slotRequestID2, slot2); assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); @@ -64,10 +67,11 @@ public void testOperations() throws Exception { assertEquals(2, allocatedSlots.size()); final AllocationID allocation3 = new AllocationID(); + final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID(); final ResourceID resource2 = new ResourceID("resource2"); final Slot slot3 = createSlot(resource2, allocation3); - allocatedSlots.add(slot3); + allocatedSlots.add(slotRequestID3, slot3); assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index f81366a0f84b8..ca5d826c33340 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -29,7 +29,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; @@ -38,6 +40,7 @@ import org.apache.flink.runtime.util.clock.Clock; import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; @@ -49,13 +52,13 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; /** * Tests for the SlotPool using a proper RPC setup. @@ -105,8 +108,8 @@ public void testSlotAllocationNoResourceManager() throws Exception { pool.start(JobMasterId.generate(), "foobar"); CompletableFuture future = pool.allocateSlot( - new AllocationID(), - mock(ScheduledUnit.class), + new SlotPoolGateway.SlotRequestID(), + new ScheduledUnit(SchedulerTestUtils.getDummyTask()), DEFAULT_TESTING_PROFILE, Collections.emptyList(), TestingUtils.infiniteTime()); @@ -138,10 +141,10 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception { pool.start(JobMasterId.generate(), "foobar"); SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); - AllocationID allocationID = new AllocationID(); + SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( - allocationID, - mock(ScheduledUnit.class), + requestId, + new ScheduledUnit(SchedulerTestUtils.getDummyTask()), DEFAULT_TESTING_PROFILE, Collections.emptyList(), Time.milliseconds(10L)); @@ -155,7 +158,7 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception { assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get()); - slotPoolGateway.cancelSlotAllocation(allocationID).get(); + slotPoolGateway.cancelSlotAllocation(requestId).get(); assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get()); } finally { @@ -182,10 +185,10 @@ public void testCancelSlotAllocationWithResourceManager() throws Exception { ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); pool.connectToResourceManager(resourceManagerGateway); - AllocationID allocationID = new AllocationID(); + SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( - allocationID, - mock(ScheduledUnit.class), + requestId, + new ScheduledUnit(SchedulerTestUtils.getDummyTask()), DEFAULT_TESTING_PROFILE, Collections.emptyList(), Time.milliseconds(10L)); @@ -199,13 +202,16 @@ public void testCancelSlotAllocationWithResourceManager() throws Exception { assertEquals(1L, (long) pool.getNumberOfPendingRequests().get()); - slotPoolGateway.cancelSlotAllocation(allocationID).get(); + slotPoolGateway.cancelSlotAllocation(requestId).get(); assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); } } + /** + * Tests that allocated slots are not cancelled. + */ @Test public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { final JobID jid = new JobID(); @@ -222,13 +228,18 @@ public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { pool.start(JobMasterId.generate(), "foobar"); SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); - ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + + TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + pool.connectToResourceManager(resourceManagerGateway); - AllocationID allocationId = new AllocationID(); + SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot( - allocationId, - mock(ScheduledUnit.class), + requestId, + new ScheduledUnit(SchedulerTestUtils.getDummyTask()), DEFAULT_TESTING_PROFILE, Collections.emptyList(), Time.milliseconds(10L)); @@ -240,6 +251,7 @@ public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); } + AllocationID allocationId = allocationIdFuture.get(); ResourceID resourceID = ResourceID.generate(); AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE); slotPoolGateway.registerTaskManager(resourceID).get(); @@ -250,7 +262,7 @@ public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { assertTrue(pool.containsAllocatedSlot(allocationId).get()); - pool.cancelSlotAllocation(allocationId).get(); + pool.cancelSlotAllocation(requestId).get(); assertFalse(pool.containsAllocatedSlot(allocationId).get()); assertTrue(pool.containsAvailableSlot(allocationId).get()); @@ -275,6 +287,11 @@ public void testProviderAndOwner() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); + final CompletableFuture cancelFuture = new CompletableFuture<>(); + + pool.setCancelSlotAllocationConsumer( + slotRequestID -> cancelFuture.complete(slotRequestID)); + try { pool.start(JobMasterId.generate(), "foobar"); ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); @@ -295,6 +312,9 @@ public void testProviderAndOwner() throws Exception { assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); } + // wait for the cancel call on the SlotPool + cancelFuture.get(); + assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); @@ -306,6 +326,8 @@ public void testProviderAndOwner() throws Exception { */ private static final class TestingSlotPool extends SlotPool { + private volatile Consumer cancelSlotAllocationConsumer; + public TestingSlotPool( RpcService rpcService, JobID jobId, @@ -320,6 +342,23 @@ public TestingSlotPool( slotRequestTimeout, resourceManagerAllocationTimeout, resourceManagerRequestTimeout); + + cancelSlotAllocationConsumer = null; + } + + public void setCancelSlotAllocationConsumer(Consumer cancelSlotAllocationConsumer) { + this.cancelSlotAllocationConsumer = Preconditions.checkNotNull(cancelSlotAllocationConsumer); + } + + @Override + public CompletableFuture cancelSlotAllocation(SlotRequestID slotRequestId) { + final Consumer currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer; + + if (currentCancelSlotAllocationConsumer != null) { + currentCancelSlotAllocationConsumer.accept(slotRequestId); + } + + return super.cancelSlotAllocation(slotRequestId); } CompletableFuture containsAllocatedSlot(AllocationID allocationId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index bee77e0896c12..c3073732e9a69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -95,8 +95,8 @@ public void testAllocateSimpleSlot() throws Exception { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - AllocationID allocationID = new AllocationID(); - CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); + CompletableFuture future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -104,8 +104,6 @@ public void testAllocateSimpleSlot() throws Exception { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - assertEquals(allocationID, slotRequest.getAllocationId()); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); @@ -131,8 +129,8 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -178,7 +176,7 @@ public void testAllocateWithFreeSlot() throws Exception { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -195,7 +193,7 @@ public void testAllocateWithFreeSlot() throws Exception { // return this slot to pool slot1.releaseSlot(); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); // second allocation fulfilled by previous slot returning SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); @@ -221,7 +219,7 @@ public void testOfferSlot() throws Exception { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture future = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -277,14 +275,14 @@ public void returnAllocatedSlot(Slot slot) { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - CompletableFuture future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); @@ -335,7 +333,7 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { slotPoolGateway.connectToResourceManager(resourceManagerGateway); CompletableFuture slotFuture = slotPoolGateway.allocateSlot( - new AllocationID(), + new SlotPoolGateway.SlotRequestID(), scheduledUnit, ResourceProfile.UNKNOWN, Collections.emptyList(),