Skip to content

Commit

Permalink
[FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all t…
Browse files Browse the repository at this point in the history
…ask attempts finished

This closes apache#24817
  • Loading branch information
Zakelly committed May 24, 2024
1 parent 71e6746 commit 54f037f
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;

import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;

import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.IOException;

/** A wrapper that wraps {@link FileMergingSnapshotManager} and a {@link Closeable}. */
public class FileMergingSnapshotManagerClosableWrapper implements Closeable {

private final FileMergingSnapshotManager snapshotManager;

private final Closeable closeable;

private boolean closed = false;

private FileMergingSnapshotManagerClosableWrapper(
@Nonnull FileMergingSnapshotManager snapshotManager, @Nonnull Closeable closeable) {
this.snapshotManager = snapshotManager;
this.closeable = closeable;
}

public static FileMergingSnapshotManagerClosableWrapper of(
@Nonnull FileMergingSnapshotManager snapshotManager, @Nonnull Closeable closeable) {
return new FileMergingSnapshotManagerClosableWrapper(snapshotManager, closeable);
}

public FileMergingSnapshotManager get() {
return snapshotManager;
}

@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
closeable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand All @@ -33,7 +35,9 @@
import javax.annotation.concurrent.GuardedBy;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY;
import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
Expand All @@ -53,7 +57,8 @@ public class TaskExecutorFileMergingManager {
* manager(executor).
*/
@GuardedBy("lock")
private final Map<JobID, FileMergingSnapshotManager> fileMergingSnapshotManagerByJobId;
private final Map<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>>
fileMergingSnapshotManagerByJobId;

@GuardedBy("lock")
private boolean closed;
Expand All @@ -74,8 +79,9 @@ public TaskExecutorFileMergingManager() {
* Initialize file merging snapshot manager for each job according configurations when {@link
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob(
public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForTask(
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
Configuration clusterConfiguration,
Configuration jobConfiguration) {
boolean mergingEnabled =
Expand All @@ -91,9 +97,10 @@ public TaskExecutorFileMergingManager() {
if (!mergingEnabled) {
return null;
}
FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingSnapshotManagerByJobId.get(jobId);
if (fileMergingSnapshotManager == null) {
Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>
fileMergingSnapshotManagerAndRetainedExecutions =
fileMergingSnapshotManagerByJobId.get(jobId);
if (fileMergingSnapshotManagerAndRetainedExecutions == null) {
FileMergingType fileMergingType =
jobConfiguration
.getOptional(FILE_MERGING_ACROSS_BOUNDARY)
Expand All @@ -111,28 +118,52 @@ public TaskExecutorFileMergingManager() {
.getOptional(FILE_MERGING_POOL_BLOCKING)
.orElse(clusterConfiguration.get(FILE_MERGING_POOL_BLOCKING));

fileMergingSnapshotManager =
new FileMergingSnapshotManagerBuilder(jobId.toString(), fileMergingType)
.setMaxFileSize(maxFileSize.getBytes())
.setFilePoolType(
usingBlockingPool
? PhysicalFilePool.Type.BLOCKING
: PhysicalFilePool.Type.NON_BLOCKING)
.build();
fileMergingSnapshotManagerByJobId.put(jobId, fileMergingSnapshotManager);
fileMergingSnapshotManagerAndRetainedExecutions =
Tuple2.of(
new FileMergingSnapshotManagerBuilder(
jobId.toString(), fileMergingType)
.setMaxFileSize(maxFileSize.getBytes())
.setFilePoolType(
usingBlockingPool
? PhysicalFilePool.Type.BLOCKING
: PhysicalFilePool.Type.NON_BLOCKING)
.build(),
new HashSet<>());
fileMergingSnapshotManagerByJobId.put(
jobId, fileMergingSnapshotManagerAndRetainedExecutions);
LOG.info("Registered new file merging snapshot manager for job {}.", jobId);
}
return fileMergingSnapshotManager;
fileMergingSnapshotManagerAndRetainedExecutions.f1.add(executionAttemptID);
return fileMergingSnapshotManagerAndRetainedExecutions.f0;
}
}

public void releaseMergingSnapshotManagerForTask(
@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID) {
synchronized (lock) {
Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>
fileMergingSnapshotManagerAndRetainedExecutions =
fileMergingSnapshotManagerByJobId.get(jobId);
if (fileMergingSnapshotManagerAndRetainedExecutions != null) {
LOG.debug(
"Releasing file merging snapshot manager under job id {} and attempt {}.",
jobId,
executionAttemptID);
fileMergingSnapshotManagerAndRetainedExecutions.f1.remove(executionAttemptID);
if (fileMergingSnapshotManagerAndRetainedExecutions.f1.isEmpty()) {
releaseMergingSnapshotManagerForJob(jobId);
}
}
}
}

/**
* Release file merging snapshot manager of one job when {@link
* Release file merging snapshot manager of one job when {@code
* org.apache.flink.runtime.taskexecutor.TaskExecutor#releaseJobResources} called.
*/
public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
LOG.debug("Releasing file merging snapshot manager under job id {}.", jobId);
FileMergingSnapshotManager toRelease = null;
Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> toRelease = null;
synchronized (lock) {
if (closed) {
return;
Expand All @@ -141,8 +172,13 @@ public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
}

if (toRelease != null) {
if (!toRelease.f1.isEmpty()) {
LOG.warn(
"The file merging snapshot manager for job {} is released before all tasks are released.",
jobId);
}
try {
toRelease.close();
toRelease.f0.close();
} catch (Exception e) {
LOG.warn(
"Exception while closing TaskExecutorFileMergingManager for job {}.",
Expand All @@ -153,9 +189,9 @@ public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
}

public void shutdown() {
HashMap<JobID, FileMergingSnapshotManager> toRelease =
new HashMap<>(fileMergingSnapshotManagerByJobId);
Map<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>> toRelease = null;
synchronized (lock) {
toRelease = new HashMap<>(fileMergingSnapshotManagerByJobId);
if (closed) {
return;
}
Expand All @@ -167,10 +203,11 @@ public void shutdown() {

ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

for (Map.Entry<JobID, FileMergingSnapshotManager> entry : toRelease.entrySet()) {
for (Map.Entry<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>> entry :
toRelease.entrySet()) {
if (entry.getValue() != null) {
try {
entry.getValue().close();
entry.getValue().f0.close();
} catch (Exception e) {
LOG.warn(
"Exception while closing TaskExecutorFileMergingManager for job {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public class TaskStateManagerImpl implements TaskStateManager {
/** The local state store to which this manager reports local state snapshots. */
private final TaskLocalStateStore localStateStore;

/** The file merging snapshot manager */
@Nullable private final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager;

/** The changelog storage where the manager reads and writes the changelog */
@Nullable private final StateChangelogStorage<?> stateChangelogStorage;

/** The file merging snapshot */
@Nullable private final FileMergingSnapshotManager fileMergingSnapshotManager;

private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;

/** The checkpoint responder through which this manager can report to the job manager. */
Expand All @@ -96,7 +96,7 @@ public TaskStateManagerImpl(
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
@Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
@Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
@Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
Expand All @@ -120,7 +120,7 @@ public TaskStateManagerImpl(
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
@Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
@Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
@Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
Expand Down Expand Up @@ -299,7 +299,7 @@ public StateChangelogStorageView<?> getStateChangelogStorageView(
@Nullable
@Override
public FileMergingSnapshotManager getFileMergingSnapshotManager() {
return fileMergingSnapshotManager;
return fileMergingSnapshotManager == null ? null : fileMergingSnapshotManager.get();
}

/** Tracking when local state can be confirmed and disposed. */
Expand All @@ -317,5 +317,8 @@ public void notifyCheckpointAborted(long checkpointId) {
@Override
public void close() throws Exception {
sequentialChannelStateReader.close();
if (fileMergingSnapshotManager != null) {
fileMergingSnapshotManager.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.FileMergingSnapshotManagerClosableWrapper;
import org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
Expand Down Expand Up @@ -776,11 +777,21 @@ public CompletableFuture<Acknowledge> submitTask(
jobInformation.getJobConfiguration());

final FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingManager.fileMergingSnapshotManagerForJob(
fileMergingManager.fileMergingSnapshotManagerForTask(
jobId,
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration());

final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosable =
fileMergingSnapshotManager == null
? null
: FileMergingSnapshotManagerClosableWrapper.of(
fileMergingSnapshotManager,
() ->
fileMergingManager.releaseMergingSnapshotManagerForTask(
jobId, tdd.getExecutionAttemptId()));

// TODO: Pass config value from user program and do overriding here.
final StateChangelogStorage<?> changelogStorage;
try {
Expand All @@ -801,7 +812,7 @@ public CompletableFuture<Acknowledge> submitTask(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
fileMergingSnapshotManager,
fileMergingSnapshotManagerClosable,
changelogStorage,
changelogStoragesManager,
taskRestore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -47,27 +48,32 @@ public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir) throws
Configuration jobConfig = new Configuration();
jobConfig.setBoolean(FILE_MERGING_ENABLED, true);
Configuration clusterConfig = new Configuration();
ExecutionAttemptID executionID1 = ExecutionAttemptID.randomId();
FileMergingSnapshotManager manager1 =
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(
job1, clusterConfig, jobConfig);
taskExecutorFileMergingManager.fileMergingSnapshotManagerForTask(
job1, executionID1, clusterConfig, jobConfig);
manager1.initFileSystem(
checkpointDir1.getFileSystem(),
checkpointDir1,
new Path(checkpointDir1, "shared"),
new Path(checkpointDir1, "taskowned"),
writeBufferSize);

ExecutionAttemptID executionID2 = ExecutionAttemptID.randomId();
FileMergingSnapshotManager manager2 =
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(
job1, clusterConfig, jobConfig);
taskExecutorFileMergingManager.fileMergingSnapshotManagerForTask(
job1, executionID2, clusterConfig, jobConfig);
manager2.initFileSystem(
checkpointDir1.getFileSystem(),
checkpointDir1,
new Path(checkpointDir1, "shared"),
new Path(checkpointDir1, "taskowned"),
writeBufferSize);

ExecutionAttemptID executionID3 = ExecutionAttemptID.randomId();
FileMergingSnapshotManager manager3 =
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(
job2, clusterConfig, jobConfig);
taskExecutorFileMergingManager.fileMergingSnapshotManagerForTask(
job2, executionID3, clusterConfig, jobConfig);
manager3.initFileSystem(
checkpointDir2.getFileSystem(),
checkpointDir2,
Expand Down Expand Up @@ -95,5 +101,15 @@ public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir) throws
// tasks with same SubtaskKey of different jobs should have different shared dirs.
assertThat(manager1.getManagedDir(key1, CheckpointedStateScope.SHARED))
.isNotEqualTo(manager3.getManagedDir(key1, CheckpointedStateScope.SHARED));

taskExecutorFileMergingManager.releaseMergingSnapshotManagerForTask(job1, executionID1);
taskExecutorFileMergingManager.releaseMergingSnapshotManagerForTask(job1, executionID2);
taskExecutorFileMergingManager.releaseMergingSnapshotManagerForTask(job2, executionID3);

ExecutionAttemptID executionID4 = ExecutionAttemptID.randomId();
FileMergingSnapshotManager manager4 =
taskExecutorFileMergingManager.fileMergingSnapshotManagerForTask(
job1, executionID4, clusterConfig, jobConfig);
assertThat(manager4).isNotEqualTo(manager1);
}
}

0 comments on commit 54f037f

Please sign in to comment.