Skip to content

Commit

Permalink
[hotfix][test] Assert the slot allocation eventually succeed in dedic…
Browse files Browse the repository at this point in the history
…ated tests of DefaultSlotStatusSyncerTest

Also deduplicate the code of these tests.
  • Loading branch information
KarmaGYZ committed Feb 20, 2024
1 parent e2e3de2 commit 15af3e4
Showing 1 changed file with 69 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
Expand All @@ -35,15 +35,18 @@
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadConsumer;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -60,118 +63,18 @@ class DefaultSlotStatusSyncerTest {
TestingUtils.defaultExecutorExtension();

@Test
void testAllocateSlot() throws Exception {
final FineGrainedTaskManagerTracker taskManagerTracker =
new FineGrainedTaskManagerTracker();
final CompletableFuture<
Tuple6<
SlotID,
JobID,
AllocationID,
ResourceProfile,
String,
ResourceManagerId>>
requestFuture = new CompletableFuture<>();
final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>();
final TestingTaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(
tuple6 -> {
requestFuture.complete(tuple6);
return responseFuture;
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection =
new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
taskManagerTracker.addTaskManager(
taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
final ResourceTracker resourceTracker = new DefaultResourceTracker();
final JobID jobId = new JobID();
final SlotStatusSyncer slotStatusSyncer =
new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
slotStatusSyncer.initialize(
taskManagerTracker,
resourceTracker,
ResourceManagerId.generate(),
EXECUTOR_RESOURCE.getExecutor());

final CompletableFuture<Void> allocatedFuture =
slotStatusSyncer.allocateSlot(
taskExecutorConnection.getInstanceID(),
jobId,
"address",
ResourceProfile.ANY);
final AllocationID allocationId = requestFuture.get().f2;
assertThat(resourceTracker.getAcquiredResources(jobId))
.contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
.hasValueSatisfying(
slot -> {
assertThat(slot.getJobId()).isEqualTo(jobId);
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
});

responseFuture.complete(Acknowledge.get());
assertThat(allocatedFuture).isNotCompletedExceptionally();
void testSlotAllocationSucceeds() throws Exception {
testSlotAllocation((ignored0, ignored1, ignored2, ignored3) -> {});
}

@Test
void testAllocationUpdatesIgnoredIfSlotFreed() throws Exception {
final FineGrainedTaskManagerTracker taskManagerTracker =
new FineGrainedTaskManagerTracker();
final CompletableFuture<
Tuple6<
SlotID,
JobID,
AllocationID,
ResourceProfile,
String,
ResourceManagerId>>
requestFuture = new CompletableFuture<>();
final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>();
final TestingTaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(
tuple6 -> {
requestFuture.complete(tuple6);
return responseFuture;
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection =
new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
taskManagerTracker.addTaskManager(
taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
final ResourceTracker resourceTracker = new DefaultResourceTracker();
final JobID jobId = new JobID();
final SlotStatusSyncer slotStatusSyncer =
new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
slotStatusSyncer.initialize(
taskManagerTracker,
resourceTracker,
ResourceManagerId.generate(),
EXECUTOR_RESOURCE.getExecutor());

final CompletableFuture<Void> allocatedFuture =
slotStatusSyncer.allocateSlot(
taskExecutorConnection.getInstanceID(),
jobId,
"address",
ResourceProfile.ANY);
final AllocationID allocationId = requestFuture.get().f2;
assertThat(resourceTracker.getAcquiredResources(jobId))
.contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
.hasValueSatisfying(
slot -> {
assertThat(slot.getJobId()).isEqualTo(jobId);
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
});

slotStatusSyncer.freeSlot(allocationId);
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty();

responseFuture.complete(Acknowledge.get());
assertThat(allocatedFuture).isNotCompletedExceptionally();
testSlotAllocation(
(slotStatusSyncer, taskManagerTracker, ignored, allocationId) -> {
slotStatusSyncer.freeSlot(allocationId);
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
.isEmpty();
});
}

@Test
Expand Down Expand Up @@ -349,4 +252,61 @@ void testSlotStatusProcessing() {
.hasValueSatisfying(
slot -> assertThat(slot.getState()).isEqualTo(SlotState.PENDING));
}

private static void testSlotAllocation(
QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID>
beforeCompletingSlotRequestCallback)
throws ExecutionException, InterruptedException {
final FineGrainedTaskManagerTracker taskManagerTracker =
new FineGrainedTaskManagerTracker();
final CompletableFuture<AllocationID> requestFuture = new CompletableFuture<>();

final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>();
final TestingTaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(
tuple6 -> {
requestFuture.complete(tuple6.f2);
return responseFuture;
})
.createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection =
new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
taskManagerTracker.addTaskManager(
taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
final ResourceTracker resourceTracker = new DefaultResourceTracker();
final JobID jobId = new JobID();
final SlotStatusSyncer slotStatusSyncer =
new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
slotStatusSyncer.initialize(
taskManagerTracker,
resourceTracker,
ResourceManagerId.generate(),
EXECUTOR_RESOURCE.getExecutor());

final CompletableFuture<Void> allocatedFuture =
slotStatusSyncer.allocateSlot(
taskExecutorConnection.getInstanceID(),
jobId,
"address",
ResourceProfile.ANY);
final AllocationID allocationId = requestFuture.get();
assertThat(resourceTracker.getAcquiredResources(jobId))
.contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
.hasValueSatisfying(
slot -> {
assertThat(slot.getJobId()).isEqualTo(jobId);
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
});

beforeCompletingSlotRequestCallback.accept(
slotStatusSyncer,
taskManagerTracker,
taskExecutorConnection.getInstanceID(),
allocationId);

responseFuture.complete(Acknowledge.get());
assertThatFuture(allocatedFuture).eventuallySucceeds();
}
}

0 comments on commit 15af3e4

Please sign in to comment.