Skip to content

Commit

Permalink
[FLINK-32084][checkpoint] Migrate current file merging of channel sta…
Browse files Browse the repository at this point in the history
…te snapshot into the unify file merging framework
  • Loading branch information
fredia committed May 9, 2024
1 parent 547e4b5 commit 4fe66e0
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,16 +43,14 @@ final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteR
private static final Logger LOG =
LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class);

private final CheckpointStorage checkpointStorage;

private final JobID jobID;
private final SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier;
private CheckpointStorageWorkerView streamFactoryResolver;

private final ChannelStateSerializer serializer;

private final Set<SubtaskID> registeredSubtasks;

private CheckpointStorageWorkerView streamFactoryResolver;

/**
* It is the checkpointId corresponding to writer. And It should be always update with {@link
* #writer}.
Expand All @@ -78,9 +75,10 @@ final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteR
private ChannelStateCheckpointWriter writer;

ChannelStateWriteRequestDispatcherImpl(
CheckpointStorage checkpointStorage, JobID jobID, ChannelStateSerializer serializer) {
this.checkpointStorage = checkNotNull(checkpointStorage);
this.jobID = jobID;
SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier,
ChannelStateSerializer serializer) {
this.checkpointStorageWorkerViewSupplier = checkpointStorageWorkerViewSupplier;
this.serializer = checkNotNull(serializer);
this.registeredSubtasks = new HashSet<>();
this.ongoingCheckpointId = -1;
Expand Down Expand Up @@ -247,7 +245,7 @@ public void fail(Throwable cause) {

CheckpointStorageWorkerView getStreamFactoryResolver() throws IOException {
if (streamFactoryResolver == null) {
streamFactoryResolver = checkpointStorage.createCheckpointStorage(jobID);
streamFactoryResolver = checkpointStorageWorkerViewSupplier.get();
}
return streamFactoryResolver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.function.SupplierWithException;

import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkState;

/** The factory of {@link ChannelStateWriteRequestExecutor}. */
Expand All @@ -42,10 +45,15 @@ public ChannelStateWriteRequestExecutorFactory(JobID jobID) {
public ChannelStateWriteRequestExecutor getOrCreateExecutor(
JobVertexID jobVertexID,
int subtaskIndex,
CheckpointStorage checkpointStorage,
SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier,
int maxSubtasksPerChannelStateFile) {
return getOrCreateExecutor(
jobVertexID, subtaskIndex, checkpointStorage, maxSubtasksPerChannelStateFile, true);
jobVertexID,
subtaskIndex,
checkpointStorageWorkerViewSupplier,
maxSubtasksPerChannelStateFile,
true);
}

/**
Expand All @@ -55,15 +63,19 @@ public ChannelStateWriteRequestExecutor getOrCreateExecutor(
ChannelStateWriteRequestExecutor getOrCreateExecutor(
JobVertexID jobVertexID,
int subtaskIndex,
CheckpointStorage checkpointStorage,
SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier,
int maxSubtasksPerChannelStateFile,
boolean startExecutor) {
synchronized (lock) {
if (executor == null) {
ChannelStateWriteRequestDispatcher dispatcher =
new ChannelStateWriteRequestDispatcherImpl(
checkpointStorageWorkerViewSupplier,
new ChannelStateSerializerImpl());
executor =
new ChannelStateWriteRequestExecutorImpl(
new ChannelStateWriteRequestDispatcherImpl(
checkpointStorage, jobID, new ChannelStateSerializerImpl()),
dispatcher,
maxSubtasksPerChannelStateFile,
executor -> {
assert Thread.holdsLock(lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,14 +88,15 @@ public ChannelStateWriterImpl(
JobVertexID jobVertexID,
String taskName,
int subtaskIndex,
CheckpointStorage checkpointStorage,
SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory,
int maxSubtasksPerChannelStateFile) {
this(
jobVertexID,
taskName,
subtaskIndex,
checkpointStorage,
checkpointStorageWorkerViewSupplier,
DEFAULT_MAX_CHECKPOINTS,
channelStateExecutorFactory,
maxSubtasksPerChannelStateFile);
Expand All @@ -113,7 +115,8 @@ public ChannelStateWriterImpl(
JobVertexID jobVertexID,
String taskName,
int subtaskIndex,
CheckpointStorage checkpointStorage,
SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>
checkpointStorageWorkerViewSupplier,
int maxCheckpoints,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory,
int maxSubtasksPerChannelStateFile) {
Expand All @@ -125,7 +128,7 @@ public ChannelStateWriterImpl(
channelStateExecutorFactory.getOrCreateExecutor(
jobVertexID,
subtaskIndex,
checkpointStorage,
checkpointStorageWorkerViewSupplier,
maxSubtasksPerChannelStateFile),
maxCheckpoints);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public FsMergingCheckpointStorageAccess toFileMergingStorage(
FileMergingSnapshotManager mergingSnapshotManager, Environment environment)
throws IOException {
return new FsMergingCheckpointStorageAccess(
fileSystem,
checkpointsDirectory,
getDefaultSavepointDirectory(),
environment.getJobID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess
private final FileMergingSnapshotManager.SubtaskKey subtaskKey;

public FsMergingCheckpointStorageAccess(
FileSystem fs,
Path checkpointBaseDirectory,
@Nullable Path defaultSavepointDirectory,
JobID jobId,
Expand All @@ -51,7 +50,10 @@ public FsMergingCheckpointStorageAccess(
Environment environment)
throws IOException {
super(
fs,
// Multiple subtask/threads would share one output stream,
// SafetyNetWrapperFileSystem cannot be used to prevent different threads from
// interfering with each other when exiting.
FileSystem.getUnguardedFileSystem(checkpointBaseDirectory.toUri()),
checkpointBaseDirectory,
defaultSavepointDirectory,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ private void testBuffersRecycled(
Function<NetworkBuffer[], ChannelStateWriteRequest> requestBuilder) throws Exception {
ChannelStateWriteRequestDispatcher dispatcher =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
ChannelStateWriteResult result = new ChannelStateWriteResult();
dispatcher.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX));
Expand Down Expand Up @@ -113,8 +112,7 @@ private void testStartNewCheckpointAndCheckOldCheckpointResult(boolean isDiffere
throws Exception {
ChannelStateWriteRequestDispatcher processor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
ChannelStateWriteResult result = new ChannelStateWriteResult();
processor.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX));
Expand Down Expand Up @@ -157,8 +155,7 @@ private void testStartOldCheckpointAfterNewCheckpointAborted(boolean isDifferent
throws Exception {
ChannelStateWriteRequestDispatcher processor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
processor.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX));
JobVertexID newJobVertex = JOB_VERTEX_ID;
Expand Down Expand Up @@ -194,8 +191,7 @@ void testAbortCheckpointAndCheckAllException() throws Exception {
private void testAbortCheckpointAndCheckAllException(int numberOfSubtask) throws Exception {
ChannelStateWriteRequestDispatcher processor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
List<ChannelStateWriteResult> results = new ArrayList<>(numberOfSubtask);
for (int i = 0; i < numberOfSubtask; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ private static CheckpointStartRequest start() {
void doRun() {
ChannelStateWriteRequestDispatcher processor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
try {
processor.dispatch(register());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,15 +38,15 @@ public class ChannelStateWriteRequestExecutorFactoryTest {
private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage();

@Test
void testReuseExecutorForSameJobId() {
void testReuseExecutorForSameJobId() throws IOException {
assertReuseExecutor(1);
assertReuseExecutor(2);
assertReuseExecutor(3);
assertReuseExecutor(5);
assertReuseExecutor(10);
}

private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) {
private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) throws IOException {
JobID JOB_ID = new JobID();
Random RANDOM = new Random();
ChannelStateWriteRequestExecutorFactory executorFactory =
Expand All @@ -58,7 +59,7 @@ private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) {
executorFactory.getOrCreateExecutor(
new JobVertexID(),
RANDOM.nextInt(numberOfTasks),
CHECKPOINT_STORAGE,
() -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID),
maxSubtasksPerChannelStateFile);
if (i % maxSubtasksPerChannelStateFile == 0) {
assertThat(newExecutor)
Expand Down Expand Up @@ -94,7 +95,9 @@ void testSomeSubtasksCloseDuringOtherSubtasksStarting() throws Exception {
executorFactory.getOrCreateExecutor(
jobVertexID,
i,
CHECKPOINT_STORAGE,
() ->
CHECKPOINT_STORAGE
.createCheckpointStorage(jobID),
maxSubtasksPerChannelStateFile,
false);
assertThat(executor).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ void testCanBeClosed() throws Exception {
long checkpointId = 1L;
ChannelStateWriteRequestDispatcher processor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(),
JOB_ID,
() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID),
new ChannelStateSerializerImpl());
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.function.BiConsumerWithException;

Expand All @@ -48,6 +49,8 @@ class ChannelStateWriterImplTest {
private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
private static final int SUBTASK_INDEX = 0;

private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage();

@Test
void testAddEventBuffer() throws Exception {

Expand Down Expand Up @@ -241,7 +244,7 @@ void testLimit() throws IOException {
JOB_VERTEX_ID,
TASK_NAME,
SUBTASK_INDEX,
new JobManagerCheckpointStorage(),
() -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID),
maxCheckpoints,
new ChannelStateWriteRequestExecutorFactory(JOB_ID),
5)) {
Expand Down Expand Up @@ -338,12 +341,12 @@ private void executeCallbackWithSyncWorker(
}
}

private ChannelStateWriterImpl openWriter() {
private ChannelStateWriterImpl openWriter() throws IOException {
return new ChannelStateWriterImpl(
JOB_VERTEX_ID,
TASK_NAME,
SUBTASK_INDEX,
new JobManagerCheckpointStorage(),
() -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID),
new ChannelStateWriteRequestExecutorFactory(JOB_ID),
5);
}
Expand Down Expand Up @@ -381,7 +384,8 @@ class SyncChannelStateWriteRequestExecutor implements ChannelStateWriteRequestEx
deque = new ArrayDeque<>();
requestProcessor =
new ChannelStateWriteRequestDispatcherImpl(
new JobManagerCheckpointStorage(), jobID, new ChannelStateSerializerImpl());
() -> new JobManagerCheckpointStorage().createCheckpointStorage(jobID),
new ChannelStateSerializerImpl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
Expand All @@ -64,6 +65,7 @@
import java.util.concurrent.Future;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** The {@link DummyEnvironment} is used for test purpose. */
public class DummyEnvironment implements Environment {
Expand All @@ -82,6 +84,8 @@ public class DummyEnvironment implements Environment {
private final ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory =
new ChannelStateWriteRequestExecutorFactory(jobInfo.getJobId());

private CheckpointStorageAccess checkpointStorageAccess;

public DummyEnvironment() {
this("Test Job", 1, 0, 1);
}
Expand Down Expand Up @@ -291,4 +295,14 @@ public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory()
public JobInfo getJobInfo() {
return jobInfo;
}

@Override
public void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) {
this.checkpointStorageAccess = checkpointStorageAccess;
}

@Override
public CheckpointStorageAccess getCheckpointStorageAccess() {
return checkNotNull(checkpointStorageAccess);
}
}
Loading

0 comments on commit 4fe66e0

Please sign in to comment.