Skip to content

Commit

Permalink
[FLINK-21996][coordination] Ensure exactly-once guarantees for Operat…
Browse files Browse the repository at this point in the history
…orEvent RPCs

This consists of two changes that work together:
  - Delay checkpoints until we have clarity about all in-flight OperatorEvents
  - Fail target subtask if the result future for an OperatorEvent send fails
  • Loading branch information
StephanEwen committed Apr 16, 2021
1 parent fe41a87 commit 9045362
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.operators.coordination;

import org.apache.flink.runtime.messages.Acknowledge;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

/** Simple interface for a component that takes and sends events. */
@FunctionalInterface
interface EventSender {

/**
* Takes the given Callable and calls it at a certain point to send the event. The result of
* that Callable are bridged to the given result future.
*/
void sendEvent(
Callable<CompletableFuture<Acknowledge>> sendAction,
CompletableFuture<Acknowledge> result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public ExecutionAttemptID currentAttempt() {
return taskExecution.getAttemptId();
}

@Override
public String subtaskName() {
return taskExecution.getVertexWithAttempt();
}

@Override
public CompletableFuture<?> hasSwitchedToRunning() {
return taskExecution.getInitializingOrRunningFuture();
Expand All @@ -91,6 +96,11 @@ public boolean isStillRunning() {
|| taskExecution.getState() == ExecutionState.INITIALIZING;
}

@Override
public void triggerTaskFailover(Throwable cause) {
taskExecution.fail(cause);
}

// ------------------------------------------------------------------------

static final class ExecutionJobVertexSubtaskAccess implements SubtaskAccessFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
Expand All @@ -34,6 +36,8 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
Expand Down Expand Up @@ -112,11 +116,15 @@
public class OperatorCoordinatorHolder
implements OperatorCoordinatorCheckpointContext, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinatorHolder.class);

private final OperatorCoordinator coordinator;
private final OperatorID operatorId;
private final LazyInitializedCoordinatorContext context;
private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
private final OperatorEventValve eventValve;
private final IncompleteFuturesTracker unconfirmedEvents;
private final EventSender eventSender;

private final int operatorParallelism;
private final int operatorMaxParallelism;
Expand All @@ -139,7 +147,9 @@ private OperatorCoordinatorHolder(
this.operatorParallelism = operatorParallelism;
this.operatorMaxParallelism = operatorMaxParallelism;

this.unconfirmedEvents = new IncompleteFuturesTracker();
this.eventValve = new OperatorEventValve();
this.eventSender = new ValveAndTrackerSender(eventValve, unconfirmedEvents);
}

public void lazyInitialize(
Expand Down Expand Up @@ -278,7 +288,7 @@ private void checkpointCoordinatorInternal(
if (failure != null) {
result.completeExceptionally(failure);
} else if (eventValve.tryShutValve(checkpointId)) {
result.complete(success);
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
} else {
// if we cannot shut the valve, this means the checkpoint
// has been aborted before, so the future is already
Expand All @@ -299,6 +309,43 @@ private void checkpointCoordinatorInternal(
}
}

private void completeCheckpointOnceEventsAreDone(
final long checkpointId,
final CompletableFuture<byte[]> checkpointFuture,
final byte[] checkpointResult) {

final Collection<CompletableFuture<?>> pendingEvents =
unconfirmedEvents.getCurrentIncompleteAndReset();
if (pendingEvents.isEmpty()) {
checkpointFuture.complete(checkpointResult);
return;
}

LOG.info(
"Coordinator checkpoint {} for coordinator {} is awaiting {} pending events",
checkpointId,
operatorId,
pendingEvents.size());

final CompletableFuture<?> conjunct = FutureUtils.waitForAll(pendingEvents);
conjunct.whenComplete(
(success, failure) -> {
if (failure == null) {
checkpointFuture.complete(checkpointResult);
} else {
// if we reach this situation, then anyways the checkpoint cannot
// complete because
// (a) the target task really is down
// (b) we have a potentially lost RPC message and need to
// do a task failover for the receiver to restore consistency
checkpointFuture.completeExceptionally(
new FlinkException(
"Failing OperatorCoordinator checkpoint because some OperatorEvents "
+ "before this checkpoint barrier were not received by the target tasks."));
}
});
}

// ------------------------------------------------------------------------
// Checkpointing Callbacks
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -336,7 +383,7 @@ private void setupSubtaskGateway(int subtask) {
final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);

final OperatorCoordinator.SubtaskGateway gateway =
new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
new SubtaskGatewayImpl(sta, eventSender, mainThreadExecutor);

// We need to do this synchronously here, otherwise we violate the contract that
// 'subtaskFailed()' will never overtake 'subtaskReady()'.
Expand Down Expand Up @@ -522,4 +569,25 @@ public ClassLoader getUserCodeClassloader() {
return userCodeClassLoader;
}
}

// ------------------------------------------------------------------------

private static final class ValveAndTrackerSender implements EventSender {

private final OperatorEventValve valve;
private final IncompleteFuturesTracker tracker;

ValveAndTrackerSender(OperatorEventValve valve, IncompleteFuturesTracker tracker) {
this.valve = valve;
this.tracker = tracker;
}

@Override
public void sendEvent(
Callable<CompletableFuture<Acknowledge>> sendAction,
CompletableFuture<Acknowledge> result) {
valve.sendEvent(sendAction, result);
tracker.trackFutureWhileIncomplete(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* that, one can register a "main thread executor" (as used by the mailbox components like RPC
* components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
*/
final class OperatorEventValve {
final class OperatorEventValve implements EventSender {

private static final long NO_CHECKPOINT = Long.MIN_VALUE;

Expand All @@ -55,7 +55,7 @@ final class OperatorEventValve {
@Nullable private ComponentMainThreadExecutor mainThreadExecutor;

/** Constructs a new OperatorEventValve. */
public OperatorEventValve() {
OperatorEventValve() {
this.currentCheckpointId = NO_CHECKPOINT;
this.lastCheckpointId = Long.MIN_VALUE;
}
Expand All @@ -82,6 +82,7 @@ public boolean isShut() {
* <p>This method makes no assumptions and gives no guarantees from which thread the result
* future gets completed.
*/
@Override
public void sendEvent(
Callable<CompletableFuture<Acknowledge>> sendAction,
CompletableFuture<Acknowledge> result) {
Expand Down Expand Up @@ -172,7 +173,7 @@ private void checkRunsInMainThread() {
}
}

private static void callSendAction(
private void callSendAction(
Callable<CompletableFuture<Acknowledge>> sendAction,
CompletableFuture<Acknowledge> result) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Callable<CompletableFuture<Acknowledge>> createEventSendAction(
/** Gets the execution attempt ID of the attempt that this instance is bound to. */
ExecutionAttemptID currentAttempt();

/**
* Gets a descriptive name of the operator's subtask , including name, subtask-id, parallelism,
* and execution attempt.
*/
String subtaskName();

/**
* The future returned here completes once the target subtask is in a running state. As running
* state classify the states {@link ExecutionState#RUNNING} and {@link
Expand All @@ -68,6 +74,11 @@ Callable<CompletableFuture<Acknowledge>> createEventSendAction(
*/
boolean isStillRunning();

/**
* Triggers a failover for the subtaks execution attempt that this access instance is bound to.
*/
void triggerTaskFailover(Throwable cause);

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.runtime.operators.coordination;

import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;

Expand All @@ -34,14 +36,17 @@
*/
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {

private static final String EVENT_LOSS_ERROR_MESSAGE =
"An OperatorEvent from an OperatorCoordinator to a task was lost. "
+ "Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";

private final SubtaskAccess subtaskAccess;
private final OperatorEventValve valve;
private final EventSender sender;
private final Executor sendingExecutor;

SubtaskGatewayImpl(
SubtaskAccess subtaskAccess, OperatorEventValve valve, Executor sendingExecutor) {
SubtaskGatewayImpl(SubtaskAccess subtaskAccess, EventSender sender, Executor sendingExecutor) {
this.subtaskAccess = subtaskAccess;
this.valve = valve;
this.sender = sender;
this.sendingExecutor = sendingExecutor;
}

Expand All @@ -64,7 +69,22 @@ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
subtaskAccess.createEventSendAction(serializedEvent);

final CompletableFuture<Acknowledge> result = new CompletableFuture<>();
sendingExecutor.execute(() -> valve.sendEvent(sendAction, result));
FutureUtils.assertNoException(
result.handleAsync(
(success, failure) -> {
if (failure != null && subtaskAccess.isStillRunning()) {
String msg =
String.format(
EVENT_LOSS_ERROR_MESSAGE,
evt,
subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new FlinkException(msg));
}
return null;
},
sendingExecutor));

sendingExecutor.execute(() -> sender.sendEvent(sendAction, result));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

package org.apache.flink.runtime.operators.coordination.util;

import org.apache.flink.annotation.VisibleForTesting;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -64,6 +61,22 @@ public void trackFutureWhileIncomplete(CompletableFuture<?> future) {
future.whenComplete((success, failure) -> removeFromSet(future));
}

public Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() {
lock.lock();
try {
if (incompleteFutures.isEmpty()) {
return Collections.emptySet();
}

final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(incompleteFutures);
incompleteFutures.clear();
return futures;

} finally {
lock.unlock();
}
}

public void failAllFutures(Throwable cause) {
final Collection<CompletableFuture<?>> futuresToFail;

Expand Down Expand Up @@ -95,9 +108,4 @@ void removeFromSet(CompletableFuture<?> future) {
lock.unlock();
}
}

@VisibleForTesting
Set<CompletableFuture<?>> getTrackedFutures() {
return Collections.unmodifiableSet(incompleteFutures);
}
}
Loading

0 comments on commit 9045362

Please sign in to comment.