Skip to content

Commit

Permalink
[FLINK-32076][checkpoint] Introduce non-blocking file pool to reuse f…
Browse files Browse the repository at this point in the history
…iles
  • Loading branch information
masteryhx committed Mar 14, 2024
1 parent a021db6 commit 583722e
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
*/
protected boolean shouldSyncAfterClosingLogicalFile;

/** Max size for a physical file. */
protected long maxPhysicalFileSize;

/** Type of physical file pool. */
protected PhysicalFilePool.Type filePoolType;

protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;

/**
Expand All @@ -105,8 +111,11 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
*/
protected Path managedExclusiveStateDir;

public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
public FileMergingSnapshotManagerBase(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
this.id = id;
this.maxPhysicalFileSize = maxFileSize;
this.filePoolType = filePoolType;
this.ioExecutor = ioExecutor;
}

Expand Down Expand Up @@ -332,6 +341,22 @@ protected final void deletePhysicalFile(Path filePath) {
});
}

/**
* Create physical pool by filePoolType.
*
* @return physical file pool.
*/
protected final PhysicalFilePool createPhysicalPool() {
switch (filePoolType) {
case NON_BLOCKING:
return new NonBlockingPhysicalFilePool(
maxPhysicalFileSize, this::createPhysicalFile);
default:
throw new UnsupportedOperationException(
"Unsupported type of physical file pool: " + filePoolType);
}
}

// ------------------------------------------------------------------------
// abstract methods
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.concurrent.Executor;
Expand All @@ -27,6 +29,12 @@ public class FileMergingSnapshotManagerBuilder {
/** The id for identifying a {@link FileMergingSnapshotManager}. */
private final String id;

/** Max size for a file. TODO: Make it configurable. */
private long maxFileSize = 32 * 1024 * 1024;

/** Type of physical file pool. TODO: Make it configurable. */
private PhysicalFilePool.Type filePoolType = PhysicalFilePool.Type.NON_BLOCKING;

@Nullable private Executor ioExecutor = null;

/**
Expand All @@ -38,6 +46,19 @@ public FileMergingSnapshotManagerBuilder(String id) {
this.id = id;
}

/** Set the max file size. */
public FileMergingSnapshotManagerBuilder setMaxFileSize(long maxFileSize) {
Preconditions.checkArgument(maxFileSize > 0);
this.maxFileSize = maxFileSize;
return this;
}

/** Set the type of physical file pool. */
public FileMergingSnapshotManagerBuilder setFilePoolType(PhysicalFilePool.Type filePoolType) {
this.filePoolType = filePoolType;
return this;
}

/**
* Set the executor for io operation in manager. If null(default), all io operation will be
* executed synchronously.
Expand All @@ -57,6 +78,6 @@ public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExec
*/
public FileMergingSnapshotManager build() {
return new WithinCheckpointFileMergingSnapshotManager(
id, ioExecutor == null ? Runnable::run : ioExecutor);
id, maxFileSize, filePoolType, ioExecutor == null ? Runnable::run : ioExecutor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* A Non-Blocking {@link PhysicalFilePool} which will always provide usable physical file without
* blocking. It may create many physical files if {@link
* NonBlockingPhysicalFilePool#pollFile(FileMergingSnapshotManager.SubtaskKey,
* CheckpointedStateScope)} frequently.
*/
public class NonBlockingPhysicalFilePool extends PhysicalFilePool {

public NonBlockingPhysicalFilePool(
long maxFileSize, PhysicalFile.PhysicalFileCreator physicalFileCreator) {
super(maxFileSize, physicalFileCreator);
}

@Override
public boolean tryPutFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile physicalFile) {
return physicalFile.getSize() < maxFileSize
&& getFileQueue(subtaskKey, physicalFile.getScope()).offer(physicalFile);
}

@Override
@Nonnull
public PhysicalFile pollFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
throws IOException {
PhysicalFile physicalFile = getFileQueue(subtaskKey, scope).poll();
return physicalFile == null ? physicalFileCreator.perform(subtaskKey, scope) : physicalFile;
}

@Override
protected Queue<PhysicalFile> createFileQueue() {
return new ConcurrentLinkedQueue<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public interface PhysicalFileDeleter {
void perform(Path filePath) throws IOException;
}

/** Functional interface to create the physical file. */
@FunctionalInterface
public interface PhysicalFileCreator {
/** Create the file. */
PhysicalFile perform(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
throws IOException;
}

/**
* Output stream to the file, which keeps open for writing. It can be null if the file is
* closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;

/** A pool for reusing {@link PhysicalFile}. This implementation should be thread-safe. */
public abstract class PhysicalFilePool implements Closeable {

/** Types of supported physical file pool. */
public enum Type {
BLOCKING,
NON_BLOCKING
}

/** creator to create a physical file. */
protected final PhysicalFile.PhysicalFileCreator physicalFileCreator;

/** Max size for a physical file. */
protected final long maxFileSize;

/** Map maintaining queues of different subtasks for reusing shared physical files. */
protected final Map<FileMergingSnapshotManager.SubtaskKey, Queue<PhysicalFile>>
sharedPhysicalFilePoolBySubtask;

/** Queue maintaining exclusive physical files for reusing. */
protected final Queue<PhysicalFile> exclusivePhysicalFilePool;

public PhysicalFilePool(
long maxFileSize, PhysicalFile.PhysicalFileCreator physicalFileCreator) {
this.physicalFileCreator = physicalFileCreator;
this.maxFileSize = maxFileSize;
this.sharedPhysicalFilePoolBySubtask = new ConcurrentHashMap<>();
this.exclusivePhysicalFilePool = createFileQueue();
}

/**
* Try to put a physical file into file pool.
*
* @param subtaskKey the key of current subtask.
* @param physicalFile target physical file.
* @return true if file is in the pool, false otherwise.
*/
public abstract boolean tryPutFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile physicalFile)
throws IOException;

/**
* Poll a physical file from the pool.
*
* @param subtaskKey the key of current subtask.
* @param scope the scope of the checkpoint.
* @return a physical file.
*/
@Nonnull
public abstract PhysicalFile pollFile(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
throws IOException;

/**
* Create and return a file queue.
*
* @return a created file queue.
*/
protected abstract Queue<PhysicalFile> createFileQueue();

/**
* Get or create a file queue for specific subtaskKey and checkpoint scope.
*
* @param subtaskKey the key of current subtask.
* @param scope the scope of the checkpoint.
* @return an existing or created file queue.
*/
protected Queue<PhysicalFile> getFileQueue(
FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) {
return CheckpointedStateScope.SHARED.equals(scope)
? sharedPhysicalFilePoolBySubtask.computeIfAbsent(
subtaskKey, key -> createFileQueue())
: exclusivePhysicalFilePool;
}

/**
* Return whether the pool is empty or not.
*
* @return whether the pool is empty or not.
*/
public boolean isEmpty() {
return sharedPhysicalFilePoolBySubtask.isEmpty() && exclusivePhysicalFilePool.isEmpty();
}

/**
* Close files in shared file pool by subtaskKey and close all files in exclusive file pool.
*
* @param subtaskKey the key of current subtask.
* @throws IOException if anything goes wrong when closing files.
*/
public void close(FileMergingSnapshotManager.SubtaskKey subtaskKey) throws IOException {
if (sharedPhysicalFilePoolBySubtask.containsKey(subtaskKey)) {
closeFilesInQueue(sharedPhysicalFilePoolBySubtask.remove(subtaskKey));
}
closeFilesInQueue(exclusivePhysicalFilePool);
}

@Override
public void close() throws IOException {
closeFilesInQueue(exclusivePhysicalFilePool);
for (Queue<PhysicalFile> queue : sharedPhysicalFilePoolBySubtask.values()) {
closeFilesInQueue(queue);
}
sharedPhysicalFilePoolBySubtask.clear();
}

private void closeFilesInQueue(Queue<PhysicalFile> queue) throws IOException {
while (!queue.isEmpty()) {
PhysicalFile physicalFile = queue.poll();
if (physicalFile != null) {
physicalFile.close();
}
}
}
}
Loading

0 comments on commit 583722e

Please sign in to comment.