Skip to content

Commit

Permalink
[FLINK-7870] [tests] Add SlotPool test to verify cancellation of fail…
Browse files Browse the repository at this point in the history
…ed slot requests

Adds the SlotPoolTest#testSlotRequestCancellationUponFailingRequest.
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 902425f commit 755ae51
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,31 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +57,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.mock;
Expand All @@ -56,6 +66,8 @@

public class SlotPoolTest extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);

private final Time timeout = Time.seconds(10L);

private RpcService rpcService;
Expand Down Expand Up @@ -294,6 +306,59 @@ public void returnAllocatedSlot(Slot slot) {
}
}

/**
* Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException).
*
* <p>See FLINK-7870
*/
@Test
public void testSlotRequestCancellationUponFailingRequest() throws Exception {
final SlotPool slotPool = new SlotPool(rpcService, jobId);
final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();

final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));

final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));

try {
slotPool.start(JobMasterId.generate(), "localhost");

final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);

slotPoolGateway.connectToResourceManager(resourceManagerGateway);

CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
scheduledUnit,
ResourceProfile.UNKNOWN,
Collections.emptyList(),
timeout);

requestSlotFuture.completeExceptionally(new FlinkException("Testing exception."));

try {
slotFuture.get();
fail("The slot future should not have been completed properly.");
} catch (Exception ignored) {
// expected
}

// check that a failure triggered the slot request cancellation
// with the correct allocation id
assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
} finally {
try {
RpcUtils.terminateRpcEndpoint(slotPool, timeout);
} catch (Exception e) {
LOG.warn("Could not properly terminate the SlotPool.", e);
}
}
}

private static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,6 @@ public void testUnregisterPendingSlotRequest() throws Exception {
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);

try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
// verify that if the request has not been assigned, should cancel the resource allocation
slotManager.registerSlotRequest(slotRequest);
PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId);
assertFalse(pendingSlotRequest.isAssigned());

slotManager.unregisterSlotRequest(allocationId);
pendingSlotRequest = slotManager.getSlotRequest(allocationId);
assertTrue(pendingSlotRequest == null);

slotManager.registerTaskManager(taskManagerConnection, slotReport);

TaskManagerSlot slot = slotManager.getSlot(slotId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.utils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
*/
public class TestingResourceManagerGateway implements ResourceManagerGateway {

private final ResourceManagerId resourceManagerId;

private final ResourceID resourceId;

private final long heartbeatInterval;

private final String address;

private final String hostname;

private final AtomicReference<CompletableFuture<Acknowledge>> slotFutureReference;

private volatile Consumer<AllocationID> cancelSlotConsumer;

private volatile Consumer<SlotRequest> requestSlotConsumer;

public TestingResourceManagerGateway() {
this(
ResourceManagerId.generate(),
ResourceID.generate(),
10000L,
"localhost",
"localhost");
}

public TestingResourceManagerGateway(
ResourceManagerId resourceManagerId,
ResourceID resourceId,
long heartbeatInterval,
String address,
String hostname) {
this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
this.resourceId = Preconditions.checkNotNull(resourceId);
this.heartbeatInterval = heartbeatInterval;
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
this.slotFutureReference = new AtomicReference<>();
this.cancelSlotConsumer = null;
this.requestSlotConsumer = null;
}

public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) {
this.slotFutureReference.set(slotFuture);
}

public void setCancelSlotConsumer(Consumer<AllocationID> cancelSlotConsumer) {
this.cancelSlotConsumer = cancelSlotConsumer;
}

public void setRequestSlotConsumer(Consumer<SlotRequest> slotRequestConsumer) {
this.requestSlotConsumer = slotRequestConsumer;
}

@Override
public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
return CompletableFuture.completedFuture(
new JobMasterRegistrationSuccess(
heartbeatInterval,
resourceManagerId,
resourceId));
}

@Override
public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time timeout) {
Consumer<SlotRequest> currentRequestSlotConsumer = requestSlotConsumer;

if (currentRequestSlotConsumer != null) {
currentRequestSlotConsumer.accept(slotRequest);
}

CompletableFuture<Acknowledge> slotFuture = slotFutureReference.getAndSet(null);

if (slotFuture != null) {
return slotFuture;
} else {
return CompletableFuture.completedFuture(Acknowledge.get());
}
}

@Override
public void cancelSlotRequest(AllocationID allocationID) {
Consumer<AllocationID> currentCancelSlotConsumer = cancelSlotConsumer;

if (currentCancelSlotConsumer != null) {
currentCancelSlotConsumer.accept(allocationID);
}
}

@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
return CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
resourceId,
heartbeatInterval));
}

@Override
public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId) {

}

@Override
public void registerInfoMessageListener(String infoMessageListenerAddress) {

}

@Override
public void unRegisterInfoMessageListener(String infoMessageListenerAddress) {

}

@Override
public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {

}

@Override
public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
return CompletableFuture.completedFuture(0);
}

@Override
public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, SlotReport slotReport) {

}

@Override
public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {

}

@Override
public void disconnectTaskManager(ResourceID resourceID, Exception cause) {

}

@Override
public void disconnectJobManager(JobID jobId, Exception cause) {

}

@Override
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
return CompletableFuture.completedFuture(Collections.emptyList());
}

@Override
public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
}

@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
}

@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
return CompletableFuture.completedFuture(Collections.emptyList());
}

@Override
public ResourceManagerId getFencingToken() {
return resourceManagerId;
}

@Override
public String getAddress() {
return address;
}

@Override
public String getHostname() {
return hostname;
}
}

0 comments on commit 755ae51

Please sign in to comment.