Skip to content

Commit

Permalink
[FLINK-19637][coordination][tests] Remove AllocationIdsExposingRMGateway
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 15, 2020
1 parent a6a0094 commit d0a3716
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.AllocationIdsExposingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
Expand Down Expand Up @@ -98,13 +97,11 @@ public void testExecutionDeploymentReconciliation() throws Exception {
JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = createResourceManagerGateway();

final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<>();
TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture);
LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();

registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation);
registerTaskExecutorAndOfferSlots(jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation);

taskDeploymentFuture.get();
assertFalse(taskCancellationFuture.isDone());
Expand Down Expand Up @@ -134,14 +131,12 @@ public void testExecutionDeploymentReconciliationForPendingExecution() throws Ex
JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = createResourceManagerGateway();

final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<>();
final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture = new CompletableFuture<>();
TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture, taskSubmissionFuture, taskSubmissionAcknowledgeFuture);
LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();

registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation);
registerTaskExecutorAndOfferSlots(jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation);

ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();

Expand Down Expand Up @@ -183,13 +178,6 @@ public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
return jobMaster;
}

private AllocationIdsExposingResourceManagerGateway createResourceManagerGateway() {
AllocationIdsExposingResourceManagerGateway resourceManagerGateway = new AllocationIdsExposingResourceManagerGateway();
RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
return resourceManagerGateway;
}

private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> taskCancellationFuture) {
return createTaskExecutorGateway(taskCancellationFuture, new CompletableFuture<>(), CompletableFuture.completedFuture(Acknowledge.get()));
}
Expand All @@ -213,14 +201,12 @@ private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<Executio
}

private void registerTaskExecutorAndOfferSlots(
AllocationIdsExposingResourceManagerGateway resourceManagerGateway,
JobMasterGateway jobMasterGateway,
TaskExecutorGateway taskExecutorGateway,
UnresolvedTaskManagerLocation taskManagerLocation) throws ExecutionException, InterruptedException {
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();

final AllocationID allocationId = resourceManagerGateway.takeAllocationId();
Collection<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN));
Collection<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN));

jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.AllocationIdsExposingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
Expand Down Expand Up @@ -178,10 +177,7 @@ public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandl
TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());

SettableLeaderRetrievalService rmLeaderRetrievalService = new SettableLeaderRetrievalService(
null,
null);
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(null, null));

final TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();

Expand All @@ -208,27 +204,20 @@ public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandl
registerTaskExecutorAtJobMaster(
rpcService,
getJobMasterGateway(),
taskExecutorGateway,
rmLeaderRetrievalService
taskExecutorGateway
);
}

private void registerTaskExecutorAtJobMaster(
TestingRpcService rpcService,
JobMasterGateway jobMasterGateway,
TaskExecutorGateway taskExecutorGateway,
SettableLeaderRetrievalService rmLeaderRetrievalService) throws ExecutionException, InterruptedException {

final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = new AllocationIdsExposingResourceManagerGateway();
rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
TaskExecutorGateway taskExecutorGateway) throws ExecutionException, InterruptedException {

rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);

jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), localTaskManagerUnresolvedLocation, testingTimeout).get();

final AllocationID allocationId = resourceManagerGateway.takeAllocationId();
Collection<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN));
Collection<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN));

jobMasterGateway.offerSlots(localTaskManagerUnresolvedLocation.getResourceID(), slotOffers, testingTimeout).get();
}
Expand Down

This file was deleted.

0 comments on commit d0a3716

Please sign in to comment.