Skip to content

Commit

Permalink
[FLINK-22704][tests] Harden ZooKeeperHaServicesTest.testCleanupJobData
Browse files Browse the repository at this point in the history
This commit hardens the ZooKeeperHaServicesTest.testCleanupJobData by making sure that all
zNodes have been created before checking for their existence. This is done by waiting for
the leader information for the ResourceManager and the JobMaster.

This closes apache#15954.
  • Loading branch information
tillrohrmann committed May 20, 2021
1 parent 74f4b7f commit 3558ff1
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,26 +221,35 @@ private void runCleanupTestWithJob(
zooKeeperHaServices.getResourceManagerLeaderRetriever();
final LeaderElectionService resourceManagerLeaderElectionService =
zooKeeperHaServices.getResourceManagerLeaderElectionService();

final LeaderRetrievalService jobManagerLeaderRetriever =
zooKeeperHaServices.getJobManagerLeaderRetriever(jobId);
final LeaderElectionService jobManagerLeaderElectionService =
zooKeeperHaServices.getJobManagerLeaderElectionService(jobId);

final RunningJobsRegistry runningJobsRegistry =
zooKeeperHaServices.getRunningJobsRegistry();

final LeaderRetrievalUtils.LeaderConnectionInfoListener listener =
final LeaderRetrievalUtils.LeaderConnectionInfoListener resourceManagerLeaderListener =
new LeaderRetrievalUtils.LeaderConnectionInfoListener();
resourceManagerLeaderRetriever.start(listener);
resourceManagerLeaderElectionService.start(
new TestingContender("foobar", resourceManagerLeaderElectionService));
LeaderElectionService jobManagerLeaderElectionService =
zooKeeperHaServices.getJobManagerLeaderElectionService(jobId);
new TestingContender(
"unused-resourcemanager-address",
resourceManagerLeaderElectionService));
resourceManagerLeaderRetriever.start(resourceManagerLeaderListener);

final LeaderRetrievalUtils.LeaderConnectionInfoListener jobManagerLeaderListener =
new LeaderRetrievalUtils.LeaderConnectionInfoListener();
jobManagerLeaderElectionService.start(
new TestingContender("", jobManagerLeaderElectionService));
LeaderRetrievalService jobManagerLeaderRetriever =
zooKeeperHaServices.getJobManagerLeaderRetriever(jobId);
jobManagerLeaderRetriever.start(
new LeaderRetrievalUtils.LeaderConnectionInfoListener());
new TestingContender(
"unused-jobmanager-address", jobManagerLeaderElectionService));
jobManagerLeaderRetriever.start(jobManagerLeaderListener);

runningJobsRegistry.setJobRunning(jobId);

listener.getLeaderConnectionInfoFuture().join();
// Make sure that the respective zNodes have been properly created
resourceManagerLeaderListener.getLeaderConnectionInfoFuture().join();
jobManagerLeaderListener.getLeaderConnectionInfoFuture().join();

resourceManagerLeaderRetriever.stop();
resourceManagerLeaderElectionService.stop();
Expand Down

0 comments on commit 3558ff1

Please sign in to comment.