Skip to content

Commit

Permalink
[FLINK-16986][coordination][refactor] Reduce dependencies of Operator…
Browse files Browse the repository at this point in the history
…CoordinatorHolder and OperatorCoordinatorCheckpointContext

This simplifies both testing and future refactoring.
  • Loading branch information
StephanEwen committed May 30, 2020
1 parent 1a721d8 commit b233aa8
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand Down Expand Up @@ -626,7 +627,7 @@ private PendingCheckpoint createPendingCheckpoint(
checkpointID,
timestamp,
ackTasks,
OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
OperatorInfo.getIds(coordinatorsToCheckpoint),
masterHooks.keySet(),
props,
checkpointStorageLocation,
Expand Down Expand Up @@ -1074,7 +1075,7 @@ private void sendAcknowledgeMessages(long checkpointId, long timestamp) {

// commit coordinators
for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
coordinatorContext.coordinator().checkpointComplete(checkpointId);
coordinatorContext.checkpointComplete(checkpointId);
}
}

Expand Down Expand Up @@ -1496,7 +1497,7 @@ private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> ope

final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
if (coordinatorState != null) {
coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
coordContext.resetToCheckpoint(coordinatorState.getData());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,24 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;

import java.util.Collection;
import java.util.stream.Collectors;
import java.util.concurrent.CompletableFuture;

/**
* An {@link OperatorCoordinator} and its contextual information needed to trigger and
* acknowledge a checkpoint.
* This context is the interface through which the {@link CheckpointCoordinator} interacts with an
* {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
*/
public interface OperatorCoordinatorCheckpointContext {
public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {

// ------------------------------------------------------------------------
// properties
// ------------------------------------------------------------------------

OperatorCoordinator coordinator();

OperatorID operatorId();

int maxParallelism();

int currentParallelism();

// ------------------------------------------------------------------------
// checkpoint triggering callbacks
// ------------------------------------------------------------------------

void onCallTriggerCheckpoint(long checkpointId);

void onCheckpointStateFutureComplete(long checkpointId);
CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;

void afterSourceBarrierInjection(long checkpointId);

void abortCurrentTriggering();

// ------------------------------------------------------------------------
// utils
// ------------------------------------------------------------------------
void checkpointComplete(long checkpointId);

static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
return infos.stream()
.map(OperatorCoordinatorCheckpointContext::operatorId)
.collect(Collectors.toList());
}
void resetToCheckpoint(byte[] checkpointData) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;

import java.util.ArrayList;
Expand All @@ -39,15 +40,15 @@
final class OperatorCoordinatorCheckpoints {

public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
final OperatorCoordinatorCheckpointContext coordinatorInfo,
final OperatorCoordinatorCheckpointContext coordinatorContext,
final long checkpointId) throws Exception {

final CompletableFuture<byte[]> checkpointFuture =
coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
coordinatorContext.checkpointCoordinator(checkpointId);

return checkpointFuture.thenApply(
(state) -> new CoordinatorSnapshot(
coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
);
}

Expand All @@ -59,16 +60,7 @@ public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCh

for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
coordinator.onCallTriggerCheckpoint(checkpointId);

individualSnapshots.add(checkpointFuture);
checkpointFuture.whenComplete((ignored, failure) -> {
if (failure != null) {
coordinator.abortCurrentTriggering();
} else {
coordinator.onCheckpointStateFutureComplete(checkpointId);
}
});
}

return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
Expand Down Expand Up @@ -144,10 +136,10 @@ public Iterable<CoordinatorSnapshot> snapshots() {

static final class CoordinatorSnapshot {

final OperatorCoordinatorCheckpointContext coordinator;
final OperatorInfo coordinator;
final ByteStreamStateHandle state;

CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) {
CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
this.coordinator = coordinator;
this.state = state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand Down Expand Up @@ -431,7 +432,7 @@ public TaskAcknowledgeResult acknowledgeTask(
}

public TaskAcknowledgeResult acknowledgeCoordinatorState(
OperatorCoordinatorCheckpointContext coordinatorInfo,
OperatorInfo coordinatorInfo,
@Nullable ByteStreamStateHandle stateHandle) {

synchronized (lock) {
Expand Down
Loading

0 comments on commit b233aa8

Please sign in to comment.