From 77ec8055617330de6678403ac3843a52aa8f07f9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 20 Oct 2021 00:45:48 +0200 Subject: [PATCH] [FLINK-25849] Add TaskManager session id to JobMaster.registerTaskManager The TaskManager session id allows to distinguish between duplicate registration attempts and a new TaskManager instance that has an old ResourceID. Signed-off-by: slinkydeveloper This closes #18534. --- .../apache/flink/testutils/TestingUtils.java | 6 + .../flink/runtime/jobmaster/JobMaster.java | 156 ++++++++++-------- .../runtime/jobmaster/JobMasterGateway.java | 8 +- .../jobmaster/TaskManagerRegistration.java | 58 +++++++ .../TaskManagerRegistrationInformation.java | 64 +++++++ .../taskexecutor/DefaultJobLeaderService.java | 30 ++-- .../runtime/dispatcher/JobMasterTester.java | 9 +- ...ExecutionDeploymentReconciliationTest.java | 7 +- .../JobMasterPartitionReleaseTest.java | 7 +- .../runtime/jobmaster/JobMasterTest.java | 125 ++++++++++++-- .../runtime/jobmaster/JobMasterTestUtils.java | 7 +- .../utils/TestingJobMasterGateway.java | 18 +- .../utils/TestingJobMasterGatewayBuilder.java | 14 +- .../DefaultJobLeaderServiceTest.java | 2 +- ...ExecutionDeploymentReconciliationTest.java | 2 +- .../TaskExecutorPartitionLifecycleTest.java | 2 +- .../taskexecutor/TaskExecutorTest.java | 8 +- 17 files changed, 391 insertions(+), 132 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistration.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistrationInformation.java diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java index 94f1aa5c98ebb..e4262b86f647a 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java @@ -23,11 +23,13 @@ import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import java.time.Duration; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** Convenience functions to test actor based components. */ public class TestingUtils { + private static final UUID ZERO_UUID = new UUID(0L, 0L); public static final Duration TESTING_DURATION = Duration.ofMinutes(2L); public static final Time TIMEOUT = Time.minutes(1L); @@ -50,4 +52,8 @@ public static synchronized ScheduledExecutorService defaultExecutor() { public static ScheduledExecutor defaultScheduledExecutor() { return new ScheduledExecutorServiceAdapter(defaultExecutor()); } + + public static UUID zeroUUID() { + return ZERO_UUID; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 99cf0a64cf96f..08bd2f8f5826e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; @@ -95,6 +94,7 @@ import org.slf4j.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -171,8 +171,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint // --------- TaskManagers -------- - private final Map> - registeredTaskManagers; + private final Map registeredTaskManagers; private final ShuffleMaster shuffleMaster; @@ -260,10 +259,12 @@ public void onUnknownDeploymentsOf( executionAttemptIds, host); for (ExecutionAttemptID executionAttemptId : executionAttemptIds) { - Tuple2 taskManagerInfo = + TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.get(host); - if (taskManagerInfo != null) { - taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout); + if (taskManagerRegistration != null) { + taskManagerRegistration + .getTaskExecutorGateway() + .cancelTask(executionAttemptId, rpcTimeout); } } } @@ -306,14 +307,9 @@ public void onUnknownDeploymentsOf( checkNotNull(partitionTrackerFactory) .create( resourceID -> { - Tuple2 - taskManagerInfo = - registeredTaskManagers.get(resourceID); - if (taskManagerInfo == null) { - return Optional.empty(); - } - - return Optional.of(taskManagerInfo.f1); + return Optional.ofNullable( + registeredTaskManagers.get(resourceID)) + .map(TaskManagerRegistration::getTaskExecutorGateway); }); this.shuffleMaster = checkNotNull(shuffleMaster); @@ -507,11 +503,12 @@ public CompletableFuture disconnectTaskManager( slotPoolService.releaseTaskManager(resourceID, cause); partitionTracker.stopTrackingPartitionsFor(resourceID); - Tuple2 taskManagerConnection = - registeredTaskManagers.remove(resourceID); + TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.remove(resourceID); - if (taskManagerConnection != null) { - taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause); + if (taskManagerRegistration != null) { + taskManagerRegistration + .getTaskExecutorGateway() + .disconnectJobManager(jobGraph.getJobID(), cause); } return CompletableFuture.completedFuture(Acknowledge.get()); @@ -618,22 +615,22 @@ public CompletableFuture notifyKvStateUnregistered( public CompletableFuture> offerSlots( final ResourceID taskManagerId, final Collection slots, final Time timeout) { - Tuple2 taskManager = - registeredTaskManagers.get(taskManagerId); + TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.get(taskManagerId); - if (taskManager == null) { + if (taskManagerRegistration == null) { return FutureUtils.completedExceptionally( new Exception("Unknown TaskManager " + taskManagerId)); } - final TaskManagerLocation taskManagerLocation = taskManager.f0; - final TaskExecutorGateway taskExecutorGateway = taskManager.f1; - final RpcTaskManagerGateway rpcTaskManagerGateway = - new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken()); + new RpcTaskManagerGateway( + taskManagerRegistration.getTaskExecutorGateway(), getFencingToken()); return CompletableFuture.completedFuture( - slotPoolService.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots)); + slotPoolService.offerSlots( + taskManagerRegistration.getTaskManagerLocation(), + rpcTaskManagerGateway, + slots)); } @Override @@ -677,9 +674,8 @@ private void releaseEmptyTaskManager(ResourceID resourceId) { @Override public CompletableFuture registerTaskManager( - final String taskManagerRpcAddress, - final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, final JobID jobId, + final TaskManagerRegistrationInformation taskManagerRegistrationInformation, final Time timeout) { if (!jobGraph.getJobID().equals(jobId)) { @@ -694,54 +690,80 @@ public CompletableFuture registerTaskManager( } final TaskManagerLocation taskManagerLocation; + try { + taskManagerLocation = + resolveTaskManagerLocation( + taskManagerRegistrationInformation.getUnresolvedTaskManagerLocation()); + } catch (FlinkException exception) { + log.error("Could not accept TaskManager registration.", exception); + return CompletableFuture.completedFuture(new RegistrationResponse.Failure(exception)); + } + + final ResourceID taskManagerId = taskManagerLocation.getResourceID(); + final UUID sessionId = taskManagerRegistrationInformation.getTaskManagerSession(); + final TaskManagerRegistration taskManagerRegistration = + registeredTaskManagers.get(taskManagerId); + + if (taskManagerRegistration != null) { + if (taskManagerRegistration.getSessionId().equals(sessionId)) { + log.debug( + "Ignoring registration attempt of TaskManager {} with the same session id {}.", + taskManagerId, + sessionId); + final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId); + return CompletableFuture.completedFuture(response); + } else { + disconnectTaskManager( + taskManagerId, + new FlinkException( + "A registered TaskManager re-registered with a new session id. This indicates a restart of the TaskManager. Closing the old connection.")); + } + } + + return getRpcService() + .connect( + taskManagerRegistrationInformation.getTaskManagerRpcAddress(), + TaskExecutorGateway.class) + .handleAsync( + (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { + if (throwable != null) { + return new RegistrationResponse.Failure(throwable); + } + + slotPoolService.registerTaskManager(taskManagerId); + registeredTaskManagers.put( + taskManagerId, + TaskManagerRegistration.create( + taskManagerLocation, taskExecutorGateway, sessionId)); + + // monitor the task manager as heartbeat target + taskManagerHeartbeatManager.monitorTarget( + taskManagerId, + new TaskExecutorHeartbeatSender(taskExecutorGateway)); + + return new JMTMRegistrationSuccess(resourceId); + }, + getMainThreadExecutor()); + } + + @Nonnull + private TaskManagerLocation resolveTaskManagerLocation( + UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws FlinkException { try { if (retrieveTaskManagerHostName) { - taskManagerLocation = - TaskManagerLocation.fromUnresolvedLocation( - unresolvedTaskManagerLocation, ResolutionMode.RETRIEVE_HOST_NAME); + return TaskManagerLocation.fromUnresolvedLocation( + unresolvedTaskManagerLocation, ResolutionMode.RETRIEVE_HOST_NAME); } else { - taskManagerLocation = - TaskManagerLocation.fromUnresolvedLocation( - unresolvedTaskManagerLocation, ResolutionMode.USE_IP_ONLY); + return TaskManagerLocation.fromUnresolvedLocation( + unresolvedTaskManagerLocation, ResolutionMode.USE_IP_ONLY); } } catch (Throwable throwable) { final String errMsg = String.format( - "Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s", + "TaskManager address %s cannot be resolved. %s", unresolvedTaskManagerLocation.getExternalAddress(), throwable.getMessage()); - log.error(errMsg); - return CompletableFuture.completedFuture( - new RegistrationResponse.Failure(new FlinkException(errMsg, throwable))); - } - - final ResourceID taskManagerId = taskManagerLocation.getResourceID(); - - if (registeredTaskManagers.containsKey(taskManagerId)) { - final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId); - return CompletableFuture.completedFuture(response); - } else { - return getRpcService() - .connect(taskManagerRpcAddress, TaskExecutorGateway.class) - .handleAsync( - (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { - if (throwable != null) { - return new RegistrationResponse.Failure(throwable); - } - - slotPoolService.registerTaskManager(taskManagerId); - registeredTaskManagers.put( - taskManagerId, - Tuple2.of(taskManagerLocation, taskExecutorGateway)); - - // monitor the task manager as heartbeat target - taskManagerHeartbeatManager.monitorTarget( - taskManagerId, - new TaskExecutorHeartbeatSender(taskExecutorGateway)); - - return new JMTMRegistrationSuccess(resourceId); - }, - getMainThreadExecutor()); + throw new FlinkException(errMsg, throwable); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 1963d40de74e7..12b6944cdbe03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; @@ -166,17 +165,16 @@ void failSlot( /** * Registers the task manager at the job manager. * - * @param taskManagerRpcAddress the rpc address of the task manager - * @param unresolvedTaskManagerLocation unresolved location of the task manager * @param jobId jobId specifying the job for which the JobMaster should be responsible + * @param taskManagerRegistrationInformation the information for registering a task manager at + * the job manager * @param timeout for the rpc call * @return Future registration response indicating whether the registration was successful or * not */ CompletableFuture registerTaskManager( - final String taskManagerRpcAddress, - final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, final JobID jobId, + final TaskManagerRegistrationInformation taskManagerRegistrationInformation, @RpcTimeout final Time timeout); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistration.java new file mode 100644 index 0000000000000..90e2139ac80f8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistration.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.UUID; + +final class TaskManagerRegistration { + private final TaskManagerLocation taskManagerLocation; + private final TaskExecutorGateway taskExecutorGateway; + private final UUID sessionId; + + private TaskManagerRegistration( + TaskManagerLocation taskManagerLocation, + TaskExecutorGateway taskExecutorGateway, + UUID sessionId) { + this.taskManagerLocation = taskManagerLocation; + this.taskExecutorGateway = taskExecutorGateway; + this.sessionId = sessionId; + } + + TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + TaskExecutorGateway getTaskExecutorGateway() { + return taskExecutorGateway; + } + + UUID getSessionId() { + return sessionId; + } + + static TaskManagerRegistration create( + TaskManagerLocation taskManagerLocation, + TaskExecutorGateway taskExecutorGateway, + UUID sessionId) { + return new TaskManagerRegistration(taskManagerLocation, taskExecutorGateway, sessionId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistrationInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistrationInformation.java new file mode 100644 index 0000000000000..61048692a290d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/TaskManagerRegistrationInformation.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.UUID; + +/** DTO for TaskManager registration information. */ +public class TaskManagerRegistrationInformation implements Serializable { + private static final long serialVersionUID = 1767026305134276540L; + + private final String taskManagerRpcAddress; + private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation; + private final UUID taskManagerSession; + + private TaskManagerRegistrationInformation( + String taskManagerRpcAddress, + UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, + UUID taskManagerSession) { + this.taskManagerRpcAddress = Preconditions.checkNotNull(taskManagerRpcAddress); + this.unresolvedTaskManagerLocation = + Preconditions.checkNotNull(unresolvedTaskManagerLocation); + this.taskManagerSession = Preconditions.checkNotNull(taskManagerSession); + } + + public String getTaskManagerRpcAddress() { + return taskManagerRpcAddress; + } + + public UnresolvedTaskManagerLocation getUnresolvedTaskManagerLocation() { + return unresolvedTaskManagerLocation; + } + + public UUID getTaskManagerSession() { + return taskManagerSession; + } + + public static TaskManagerRegistrationInformation create( + String taskManagerRpcAddress, + UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, + UUID taskManagerSession) { + return new TaskManagerRegistrationInformation( + taskManagerRpcAddress, unresolvedTaskManagerLocation, taskManagerSession); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java index 3ab1fe793ae28..9c5dcceeaa4ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.registration.RegisteredRpcConnection; @@ -70,6 +71,8 @@ public class DefaultJobLeaderService implements JobLeaderService { private final RetryingRegistrationConfiguration retryingRegistrationConfiguration; + private final UUID taskManagerSession; + /** Internal state of the service. */ private volatile DefaultJobLeaderService.State state; @@ -93,6 +96,7 @@ public DefaultJobLeaderService( this.ownLocation = Preconditions.checkNotNull(location); this.retryingRegistrationConfiguration = Preconditions.checkNotNull(retryingRegistrationConfiguration); + this.taskManagerSession = UUID.randomUUID(); // Has to be a concurrent hash map because tests might access this service // concurrently via containsJob @@ -401,9 +405,9 @@ private final class JobManagerRegisteredRpcConnection getTargetAddress(), getTargetLeaderId(), retryingRegistrationConfiguration, - ownerAddress, - ownLocation, - jobId); + jobId, + TaskManagerRegistrationInformation.create( + ownerAddress, ownLocation, taskManagerSession)); } @Override @@ -482,12 +486,10 @@ private static final class JobManagerRetryingRegistration JMTMRegistrationSuccess, JMTMRegistrationRejection> { - private final String taskManagerRpcAddress; - - private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation; - private final JobID jobId; + private final TaskManagerRegistrationInformation taskManagerRegistrationInformation; + JobManagerRetryingRegistration( Logger log, RpcService rpcService, @@ -496,9 +498,8 @@ private static final class JobManagerRetryingRegistration String targetAddress, JobMasterId jobMasterId, RetryingRegistrationConfiguration retryingRegistrationConfiguration, - String taskManagerRpcAddress, - UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, - JobID jobId) { + JobID jobId, + TaskManagerRegistrationInformation taskManagerRegistrationInformation) { super( log, rpcService, @@ -508,20 +509,15 @@ private static final class JobManagerRetryingRegistration jobMasterId, retryingRegistrationConfiguration); - this.taskManagerRpcAddress = taskManagerRpcAddress; - this.unresolvedTaskManagerLocation = - Preconditions.checkNotNull(unresolvedTaskManagerLocation); this.jobId = Preconditions.checkNotNull(jobId); + this.taskManagerRegistrationInformation = taskManagerRegistrationInformation; } @Override protected CompletableFuture invokeRegistration( JobMasterGateway gateway, JobMasterId fencingToken, long timeoutMillis) { return gateway.registerTaskManager( - taskManagerRpcAddress, - unresolvedTaskManagerLocation, - jobId, - Time.milliseconds(timeoutMillis)); + jobId, taskManagerRegistrationInformation, Time.milliseconds(timeoutMillis)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java index 67b1656964c67..e79d87431ef29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.OperatorStreamStateHandle; @@ -43,6 +44,7 @@ import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; @@ -146,7 +148,12 @@ public CompletableFuture transitionTo( public CompletableFuture> deployVertices(int numSlots) { return jobMasterGateway .registerTaskManager( - taskExecutorGateway.getAddress(), taskManagerLocation, jobId, TIMEOUT) + jobId, + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + taskManagerLocation, + TestingUtils.zeroUUID()), + TIMEOUT) .thenCompose(ignored -> offerSlots(numSlots)) .thenCompose(ignored -> descriptorsFuture); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index 780487ea4fba7..3abd26ec16dd2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Before; @@ -270,9 +271,11 @@ private void registerTaskExecutorAndOfferSlots( throws ExecutionException, InterruptedException { jobMasterGateway .registerTaskManager( - taskExecutorGateway.getAddress(), - taskManagerLocation, jobId, + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + taskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout) .get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java index dc98b4856c7ac..03a6733b3dbc2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -273,9 +274,11 @@ private void registerTaskExecutorAtJobMaster( jobMasterGateway .registerTaskManager( - taskExecutorGateway.getAddress(), - localTaskManagerUnresolvedLocation, jobId, + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + localTaskManagerUnresolvedLocation, + TestingUtils.zeroUUID()), testingTimeout) .get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 9c335b33a95cc..9501f95c50927 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -108,6 +108,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -144,6 +145,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -284,9 +286,11 @@ public void testTaskManagerRegistrationTriggersHeartbeating() throws Exception { // request at interval time CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, jobGraph.getJobID(), + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout); // wait for the completion of the registration @@ -340,9 +344,11 @@ private void runHeartbeatTest( // request at interval time CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, jobGraph.getJobID(), + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout); // wait for the completion of the registration @@ -436,9 +442,11 @@ public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Excep // request at interval time CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, jobGraph.getJobID(), + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout); // wait for the completion of the registration @@ -1774,9 +1782,11 @@ public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() th final CompletableFuture registrationResponse = jobMaster.registerTaskManager( - "foobar", - new LocalUnresolvedTaskManagerLocation(), new JobID(), + TaskManagerRegistrationInformation.create( + "foobar", + new LocalUnresolvedTaskManagerLocation(), + TestingUtils.zeroUUID()), testingTimeout); assertThat(registrationResponse.get(), instanceOf(JMTMRegistrationRejection.class)); @@ -1785,6 +1795,99 @@ public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() th } } + @Test + public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception { + final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + rpcService.registerGateway( + testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); + + try { + jobMaster.start(); + + final TaskManagerRegistrationInformation taskManagerRegistrationInformation = + TaskManagerRegistrationInformation.create( + testingTaskExecutorGateway.getAddress(), + new LocalUnresolvedTaskManagerLocation(), + UUID.randomUUID()); + + final CompletableFuture firstRegistrationResponse = + jobMaster.registerTaskManager( + jobGraph.getJobID(), + taskManagerRegistrationInformation, + testingTimeout); + final CompletableFuture secondRegistrationResponse = + jobMaster.registerTaskManager( + jobGraph.getJobID(), + taskManagerRegistrationInformation, + testingTimeout); + + assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + @Test + public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception { + final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + + final CompletableFuture firstTaskExecutorDisconnectedFuture = + new CompletableFuture<>(); + final TestingTaskExecutorGateway firstTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setAddress("firstTaskExecutor") + .setDisconnectJobManagerConsumer( + (jobID, throwable) -> + firstTaskExecutorDisconnectedFuture.complete(null)) + .createTestingTaskExecutorGateway(); + final TestingTaskExecutorGateway secondTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setAddress("secondTaskExecutor") + .createTestingTaskExecutorGateway(); + + rpcService.registerGateway(firstTaskExecutorGateway.getAddress(), firstTaskExecutorGateway); + rpcService.registerGateway( + secondTaskExecutorGateway.getAddress(), secondTaskExecutorGateway); + + try { + jobMaster.start(); + + final LocalUnresolvedTaskManagerLocation taskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); + final UUID firstTaskManagerSessionId = UUID.randomUUID(); + + final CompletableFuture firstRegistrationResponse = + jobMaster.registerTaskManager( + jobGraph.getJobID(), + TaskManagerRegistrationInformation.create( + firstTaskExecutorGateway.getAddress(), + taskManagerLocation, + firstTaskManagerSessionId), + testingTimeout); + assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + + final UUID secondTaskManagerSessionId = UUID.randomUUID(); + final CompletableFuture secondRegistrationResponse = + jobMaster.registerTaskManager( + jobGraph.getJobID(), + TaskManagerRegistrationInformation.create( + secondTaskExecutorGateway.getAddress(), + taskManagerLocation, + secondTaskManagerSessionId), + testingTimeout); + + assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + // the first TaskExecutor should be disconnected + firstTaskExecutorDisconnectedFuture.get(); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + @Test public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception { final CompletableFuture schedulerTerminationFuture = new CompletableFuture<>(); @@ -1906,9 +2009,11 @@ private Collection registerSlotsAtJobMaster( jobMasterGateway .registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, jobId, + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout) .get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java index e457f596a4673..c846de6888f40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.testutils.TestingUtils; import java.util.Collection; import java.util.concurrent.ExecutionException; @@ -51,9 +52,11 @@ public static void registerTaskExecutorAndOfferSlots( jobMasterGateway .registerTaskManager( - taskExecutorGateway.getAddress(), - unresolvedTaskManagerLocation, jobId, + TaskManagerRegistrationInformation.create( + taskExecutorGateway.getAddress(), + unresolvedTaskManagerLocation, + TestingUtils.zeroUUID()), testingTimeout) .get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index c547c5db72247..2b40b6758ce0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -53,7 +54,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.function.TriConsumer; import org.apache.flink.util.function.TriFunction; @@ -111,10 +111,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull private final TriConsumer failSlotConsumer; @Nonnull - private final TriFunction< - String, - UnresolvedTaskManagerLocation, + private final BiFunction< JobID, + TaskManagerRegistrationInformation, CompletableFuture> registerTaskManagerFunction; @@ -218,10 +217,9 @@ public TestingJobMasterGateway( offerSlotsFunction, @Nonnull TriConsumer failSlotConsumer, @Nonnull - TriFunction< - String, - UnresolvedTaskManagerLocation, + BiFunction< JobID, + TaskManagerRegistrationInformation, CompletableFuture> registerTaskManagerFunction, @Nonnull @@ -372,12 +370,10 @@ public void failSlot(ResourceID taskManagerId, AllocationID allocationId, Except @Override public CompletableFuture registerTaskManager( - String taskManagerRpcAddress, - UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, JobID jobId, + TaskManagerRegistrationInformation taskManagerRegistrationInformation, Time timeout) { - return registerTaskManagerFunction.apply( - taskManagerRpcAddress, unresolvedTaskManagerLocation, jobId); + return registerTaskManagerFunction.apply(jobId, taskManagerRegistrationInformation); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index 7c569778f9698..c0e7e05be4763 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -52,7 +53,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.TriConsumer; @@ -101,13 +101,12 @@ public class TestingJobMasterGatewayBuilder { CompletableFuture.completedFuture(Collections.emptyList()); private TriConsumer failSlotConsumer = (ignoredA, ignoredB, ignoredC) -> {}; - private TriFunction< - String, - UnresolvedTaskManagerLocation, + private BiFunction< JobID, + TaskManagerRegistrationInformation, CompletableFuture> registerTaskManagerFunction = - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB) -> CompletableFuture.completedFuture( new JMTMRegistrationSuccess(RESOURCE_MANAGER_ID)); private BiFunction< @@ -243,10 +242,9 @@ public TestingJobMasterGatewayBuilder setFailSlotConsumer( } public TestingJobMasterGatewayBuilder setRegisterTaskManagerFunction( - TriFunction< - String, - UnresolvedTaskManagerLocation, + BiFunction< JobID, + TaskManagerRegistrationInformation, CompletableFuture> registerTaskManagerFunction) { this.registerTaskManagerFunction = registerTaskManagerFunction; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java index a5a53a3eeeee8..40def75c5ccbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java @@ -263,7 +263,7 @@ public void rejectedJobManagerRegistrationCallsJobLeaderListener() throws Except final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, unresolvedTaskManagerLocation, jobID) -> + (jobID, taskManagerRegistrationInformation) -> CompletableFuture.completedFuture( new JMTMRegistrationRejection("foobar"))) .build(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java index f9e05e059bc36..b1c180868ce91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java @@ -245,7 +245,7 @@ private static TestingJobMasterGateway setupJobManagerGateway( ResourceID jobManagerResourceId) { return new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, location, ignored) -> + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture( new JMTMRegistrationSuccess(jobManagerResourceId))) .setOfferSlotsFunction( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 8a8f518cdd219..572aed09e66bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -449,7 +449,7 @@ private void internalTestPartitionRelease( final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, location, ignored) -> + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture( new JMTMRegistrationSuccess(ResourceID.generate()))) .setOfferSlotsFunction( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index c1b7ce5ace7de..4758080121dfa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -354,7 +354,7 @@ private void runJobManagerHeartbeatTest( final TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, taskManagerUnresolvedLocation, ignored) -> { + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> { registrationAttempts.countDown(); return CompletableFuture.completedFuture( new JMTMRegistrationSuccess(jmResourceId)); @@ -2274,7 +2274,7 @@ public void testSyncSlotsWithJobMasterByHeartbeat() throws Exception { CompletableFuture.completedFuture( new ArrayList<>(slotOffers))) .setRegisterTaskManagerFunction( - (ignoredA, ignoredB, ignoredC) -> + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture( new JMTMRegistrationSuccess(jobManagerResourceId))) .build(); @@ -2584,7 +2584,7 @@ public void testReleaseOfJobResourcesIfJobMasterIsNotCorrect() throws Exception final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, unresolvedTaskManagerLocation, jobID) -> + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture( new JMTMRegistrationRejection("foobar"))) .build(); @@ -2654,7 +2654,7 @@ public void testReleaseInactiveSlots() throws Exception { final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction( - (s, unresolvedTaskManagerLocation, jobID) -> + (ignoredJobId, ignoredTaskManagerRegistrationInformation) -> new CompletableFuture<>()) .build();