Skip to content

Commit

Permalink
[FLINK-25849] Add TaskManager session id to JobMaster.registerTaskMan…
Browse files Browse the repository at this point in the history
…ager

The TaskManager session id allows to distinguish between duplicate registration attempts
and a new TaskManager instance that has an old ResourceID.

Signed-off-by: slinkydeveloper <[email protected]>

This closes apache#18534.
  • Loading branch information
tillrohrmann committed Jan 28, 2022
1 parent b2011e5 commit 77ec805
Show file tree
Hide file tree
Showing 17 changed files with 391 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/** Convenience functions to test actor based components. */
public class TestingUtils {
private static final UUID ZERO_UUID = new UUID(0L, 0L);

public static final Duration TESTING_DURATION = Duration.ofMinutes(2L);
public static final Time TIMEOUT = Time.minutes(1L);
Expand All @@ -50,4 +52,8 @@ public static synchronized ScheduledExecutorService defaultExecutor() {
public static ScheduledExecutor defaultScheduledExecutor() {
return new ScheduledExecutorServiceAdapter(defaultExecutor());
}

public static UUID zeroUUID() {
return ZERO_UUID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
Expand Down Expand Up @@ -95,6 +94,7 @@

import org.slf4j.Logger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -171,8 +171,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>

// --------- TaskManagers --------

private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>
registeredTaskManagers;
private final Map<ResourceID, TaskManagerRegistration> registeredTaskManagers;

private final ShuffleMaster<?> shuffleMaster;

Expand Down Expand Up @@ -260,10 +259,12 @@ public void onUnknownDeploymentsOf(
executionAttemptIds,
host);
for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo =
TaskManagerRegistration taskManagerRegistration =
registeredTaskManagers.get(host);
if (taskManagerInfo != null) {
taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);
if (taskManagerRegistration != null) {
taskManagerRegistration
.getTaskExecutorGateway()
.cancelTask(executionAttemptId, rpcTimeout);
}
}
}
Expand Down Expand Up @@ -306,14 +307,9 @@ public void onUnknownDeploymentsOf(
checkNotNull(partitionTrackerFactory)
.create(
resourceID -> {
Tuple2<TaskManagerLocation, TaskExecutorGateway>
taskManagerInfo =
registeredTaskManagers.get(resourceID);
if (taskManagerInfo == null) {
return Optional.empty();
}

return Optional.of(taskManagerInfo.f1);
return Optional.ofNullable(
registeredTaskManagers.get(resourceID))
.map(TaskManagerRegistration::getTaskExecutorGateway);
});

this.shuffleMaster = checkNotNull(shuffleMaster);
Expand Down Expand Up @@ -507,11 +503,12 @@ public CompletableFuture<Acknowledge> disconnectTaskManager(
slotPoolService.releaseTaskManager(resourceID, cause);
partitionTracker.stopTrackingPartitionsFor(resourceID);

Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection =
registeredTaskManagers.remove(resourceID);
TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.remove(resourceID);

if (taskManagerConnection != null) {
taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
if (taskManagerRegistration != null) {
taskManagerRegistration
.getTaskExecutorGateway()
.disconnectJobManager(jobGraph.getJobID(), cause);
}

return CompletableFuture.completedFuture(Acknowledge.get());
Expand Down Expand Up @@ -618,22 +615,22 @@ public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
public CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceID taskManagerId, final Collection<SlotOffer> slots, final Time timeout) {

Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager =
registeredTaskManagers.get(taskManagerId);
TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.get(taskManagerId);

if (taskManager == null) {
if (taskManagerRegistration == null) {
return FutureUtils.completedExceptionally(
new Exception("Unknown TaskManager " + taskManagerId));
}

final TaskManagerLocation taskManagerLocation = taskManager.f0;
final TaskExecutorGateway taskExecutorGateway = taskManager.f1;

final RpcTaskManagerGateway rpcTaskManagerGateway =
new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
new RpcTaskManagerGateway(
taskManagerRegistration.getTaskExecutorGateway(), getFencingToken());

return CompletableFuture.completedFuture(
slotPoolService.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
slotPoolService.offerSlots(
taskManagerRegistration.getTaskManagerLocation(),
rpcTaskManagerGateway,
slots));
}

@Override
Expand Down Expand Up @@ -677,9 +674,8 @@ private void releaseEmptyTaskManager(ResourceID resourceId) {

@Override
public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
final JobID jobId,
final TaskManagerRegistrationInformation taskManagerRegistrationInformation,
final Time timeout) {

if (!jobGraph.getJobID().equals(jobId)) {
Expand All @@ -694,54 +690,80 @@ public CompletableFuture<RegistrationResponse> registerTaskManager(
}

final TaskManagerLocation taskManagerLocation;
try {
taskManagerLocation =
resolveTaskManagerLocation(
taskManagerRegistrationInformation.getUnresolvedTaskManagerLocation());
} catch (FlinkException exception) {
log.error("Could not accept TaskManager registration.", exception);
return CompletableFuture.completedFuture(new RegistrationResponse.Failure(exception));
}

final ResourceID taskManagerId = taskManagerLocation.getResourceID();
final UUID sessionId = taskManagerRegistrationInformation.getTaskManagerSession();
final TaskManagerRegistration taskManagerRegistration =
registeredTaskManagers.get(taskManagerId);

if (taskManagerRegistration != null) {
if (taskManagerRegistration.getSessionId().equals(sessionId)) {
log.debug(
"Ignoring registration attempt of TaskManager {} with the same session id {}.",
taskManagerId,
sessionId);
final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId);
return CompletableFuture.completedFuture(response);
} else {
disconnectTaskManager(
taskManagerId,
new FlinkException(
"A registered TaskManager re-registered with a new session id. This indicates a restart of the TaskManager. Closing the old connection."));
}
}

return getRpcService()
.connect(
taskManagerRegistrationInformation.getTaskManagerRpcAddress(),
TaskExecutorGateway.class)
.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (throwable != null) {
return new RegistrationResponse.Failure(throwable);
}

slotPoolService.registerTaskManager(taskManagerId);
registeredTaskManagers.put(
taskManagerId,
TaskManagerRegistration.create(
taskManagerLocation, taskExecutorGateway, sessionId));

// monitor the task manager as heartbeat target
taskManagerHeartbeatManager.monitorTarget(
taskManagerId,
new TaskExecutorHeartbeatSender(taskExecutorGateway));

return new JMTMRegistrationSuccess(resourceId);
},
getMainThreadExecutor());
}

@Nonnull
private TaskManagerLocation resolveTaskManagerLocation(
UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws FlinkException {
try {
if (retrieveTaskManagerHostName) {
taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.RETRIEVE_HOST_NAME);
return TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.RETRIEVE_HOST_NAME);
} else {
taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.USE_IP_ONLY);
return TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.USE_IP_ONLY);
}
} catch (Throwable throwable) {
final String errMsg =
String.format(
"Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s",
"TaskManager address %s cannot be resolved. %s",
unresolvedTaskManagerLocation.getExternalAddress(),
throwable.getMessage());
log.error(errMsg);
return CompletableFuture.completedFuture(
new RegistrationResponse.Failure(new FlinkException(errMsg, throwable)));
}

final ResourceID taskManagerId = taskManagerLocation.getResourceID();

if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId);
return CompletableFuture.completedFuture(response);
} else {
return getRpcService()
.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (throwable != null) {
return new RegistrationResponse.Failure(throwable);
}

slotPoolService.registerTaskManager(taskManagerId);
registeredTaskManagers.put(
taskManagerId,
Tuple2.of(taskManagerLocation, taskExecutorGateway));

// monitor the task manager as heartbeat target
taskManagerHeartbeatManager.monitorTarget(
taskManagerId,
new TaskExecutorHeartbeatSender(taskExecutorGateway));

return new JMTMRegistrationSuccess(resourceId);
},
getMainThreadExecutor());
throw new FlinkException(errMsg, throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -166,17 +165,16 @@ void failSlot(
/**
* Registers the task manager at the job manager.
*
* @param taskManagerRpcAddress the rpc address of the task manager
* @param unresolvedTaskManagerLocation unresolved location of the task manager
* @param jobId jobId specifying the job for which the JobMaster should be responsible
* @param taskManagerRegistrationInformation the information for registering a task manager at
* the job manager
* @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or
* not
*/
CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
final JobID jobId,
final TaskManagerRegistrationInformation taskManagerRegistrationInformation,
@RpcTimeout final Time timeout);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.UUID;

final class TaskManagerRegistration {
private final TaskManagerLocation taskManagerLocation;
private final TaskExecutorGateway taskExecutorGateway;
private final UUID sessionId;

private TaskManagerRegistration(
TaskManagerLocation taskManagerLocation,
TaskExecutorGateway taskExecutorGateway,
UUID sessionId) {
this.taskManagerLocation = taskManagerLocation;
this.taskExecutorGateway = taskExecutorGateway;
this.sessionId = sessionId;
}

TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}

TaskExecutorGateway getTaskExecutorGateway() {
return taskExecutorGateway;
}

UUID getSessionId() {
return sessionId;
}

static TaskManagerRegistration create(
TaskManagerLocation taskManagerLocation,
TaskExecutorGateway taskExecutorGateway,
UUID sessionId) {
return new TaskManagerRegistration(taskManagerLocation, taskExecutorGateway, sessionId);
}
}
Loading

0 comments on commit 77ec805

Please sign in to comment.