diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index 0260df3115b5e..55741a1117d9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.resourcemanager.AllocationIdsExposingResourceManagerGateway; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; @@ -98,13 +97,11 @@ public void testExecutionDeploymentReconciliation() throws Exception { JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); - final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = createResourceManagerGateway(); - final CompletableFuture taskCancellationFuture = new CompletableFuture<>(); TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture); LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); - registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); + registerTaskExecutorAndOfferSlots(jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); taskDeploymentFuture.get(); assertFalse(taskCancellationFuture.isDone()); @@ -134,14 +131,12 @@ public void testExecutionDeploymentReconciliationForPendingExecution() throws Ex JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); - final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = createResourceManagerGateway(); - final CompletableFuture taskCancellationFuture = new CompletableFuture<>(); final CompletableFuture taskSubmissionAcknowledgeFuture = new CompletableFuture<>(); TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture, taskSubmissionFuture, taskSubmissionAcknowledgeFuture); LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); - registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); + registerTaskExecutorAndOfferSlots(jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get(); @@ -183,13 +178,6 @@ public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) { return jobMaster; } - private AllocationIdsExposingResourceManagerGateway createResourceManagerGateway() { - AllocationIdsExposingResourceManagerGateway resourceManagerGateway = new AllocationIdsExposingResourceManagerGateway(); - RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); - resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID()); - return resourceManagerGateway; - } - private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture taskCancellationFuture) { return createTaskExecutorGateway(taskCancellationFuture, new CompletableFuture<>(), CompletableFuture.completedFuture(Acknowledge.get())); } @@ -213,14 +201,12 @@ private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture slotOffers = Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN)); + Collection slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN)); jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout).get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java index aa02627c5249f..8137747fda284 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.resourcemanager.AllocationIdsExposingResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -178,10 +177,7 @@ public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandl TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - SettableLeaderRetrievalService rmLeaderRetrievalService = new SettableLeaderRetrievalService( - null, - null); - haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(null, null)); final TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker(); @@ -208,27 +204,20 @@ public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandl registerTaskExecutorAtJobMaster( rpcService, getJobMasterGateway(), - taskExecutorGateway, - rmLeaderRetrievalService + taskExecutorGateway ); } private void registerTaskExecutorAtJobMaster( TestingRpcService rpcService, JobMasterGateway jobMasterGateway, - TaskExecutorGateway taskExecutorGateway, - SettableLeaderRetrievalService rmLeaderRetrievalService) throws ExecutionException, InterruptedException { - - final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = new AllocationIdsExposingResourceManagerGateway(); - rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); - rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID()); + TaskExecutorGateway taskExecutorGateway) throws ExecutionException, InterruptedException { rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), localTaskManagerUnresolvedLocation, testingTimeout).get(); - final AllocationID allocationId = resourceManagerGateway.takeAllocationId(); - Collection slotOffers = Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN)); + Collection slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN)); jobMasterGateway.offerSlots(localTaskManagerUnresolvedLocation.getResourceID(), slotOffers, testingTimeout).get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/AllocationIdsExposingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/AllocationIdsExposingResourceManagerGateway.java deleted file mode 100644 index c83291839b646..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/AllocationIdsExposingResourceManagerGateway.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.util.ExceptionUtils; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - -/** - * A {@link TestingResourceManagerGateway} that exposes the {@link AllocationID} of all slot requests. - */ -public final class AllocationIdsExposingResourceManagerGateway extends TestingResourceManagerGateway { - private final BlockingQueue allocationIds; - - public AllocationIdsExposingResourceManagerGateway() { - this.allocationIds = new ArrayBlockingQueue<>(10); - setRequestSlotConsumer( - slotRequest -> allocationIds.offer(slotRequest.getAllocationId()) - ); - } - - public AllocationID takeAllocationId() { - try { - return allocationIds.take(); - } catch (InterruptedException e) { - ExceptionUtils.rethrow(e); - return null; - } - } -}