From 0daca7b6db87ebcbc2e6952d16a3ec4bc1943135 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Thu, 2 May 2024 10:22:08 +0200 Subject: [PATCH] [FLINK-35771][s3] Limit the amount of work per s5cmd call Motivation: - limit CPU usage by S5cmd to prevent TM overload - pick up potentially updated credentials for every s5cmd call --- .../flink/core/fs/PathsCopyingFileSystem.java | 10 ++- .../common/AbstractS3FileSystemFactory.java | 13 +++ .../flink/fs/s3/common/FlinkS3FileSystem.java | 42 +++++++++- .../fs/s3/common/FlinkS3FileSystemTest.java | 83 +++++++++++++++++-- .../filesystem/FsCheckpointStateToolset.java | 4 +- .../state/RocksDBStateDownloader.java | 4 +- 6 files changed, 144 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java index a20f6822b3dfd..46e301152afc6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java @@ -50,8 +50,11 @@ interface CopyRequest { /** The path where to duplicate the source file. */ Path getDestination(); + /** The size in bytes of the requested file to copy. */ + long getSize(); + /** A factory method for creating a simple pair of source/destination. */ - static CopyRequest of(Path source, Path destination) { + static CopyRequest of(Path source, Path destination, long size) { return new CopyRequest() { @Override public Path getSource() { @@ -63,6 +66,11 @@ public Path getDestination() { return destination; } + @Override + public long getSize() { + return size; + } + @Override public String toString() { return "CopyRequest{" diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java index 16f391609669f..4a6846a4df208 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; import org.apache.flink.fs.s3.common.FlinkS3FileSystem.S5CmdConfiguration; @@ -77,6 +78,18 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory { .withDescription( "Extra arguments to be passed to s5cmd. For example, --no-sign-request for public buckets and -r 10 for 10 retries"); + public static final ConfigOption S5CMD_BATCH_MAX_SIZE = + ConfigOptions.key("s3.s5cmd.batch.max-size") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(1024)) + .withDescription("Maximum size of files to download per one call to s5cmd."); + + public static final ConfigOption S5CMD_BATCH_MAX_FILES = + ConfigOptions.key("s3.s5cmd.batch.max-files") + .intType() + .defaultValue(100) + .withDescription("Maximum number of files to download per one call to s5cmd"); + public static final ConfigOption PART_UPLOAD_MIN_SIZE = ConfigOptions.key("s3.upload.min.part.size") .longType() diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index f888568b07e51..49f4b2c516256 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -62,6 +62,8 @@ import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY; @@ -112,6 +114,8 @@ public static class S5CmdConfiguration { @Nullable private final String accessArtifact; @Nullable private final String secretArtifact; @Nullable private final String endpoint; + private long maxBatchSizeFiles; + private long maxBatchSizeBytes; /** All parameters can be empty. */ public S5CmdConfiguration( @@ -119,7 +123,9 @@ public S5CmdConfiguration( String args, @Nullable String accessArtifact, @Nullable String secretArtifact, - @Nullable String endpoint) { + @Nullable String endpoint, + int maxBatchSizeFiles, + long maxBatchSizeBytes) { if (!path.isEmpty()) { File s5CmdFile = new File(path); checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary under [%s]", path); @@ -131,6 +137,8 @@ public S5CmdConfiguration( this.accessArtifact = accessArtifact; this.secretArtifact = secretArtifact; this.endpoint = endpoint; + this.maxBatchSizeFiles = maxBatchSizeFiles; + this.maxBatchSizeBytes = maxBatchSizeBytes; } public static Optional of(Configuration flinkConfig) { @@ -143,7 +151,9 @@ public static Optional of(Configuration flinkConfig) { flinkConfig.getString(S5CMD_EXTRA_ARGS), flinkConfig.get(ACCESS_KEY), flinkConfig.get(SECRET_KEY), - flinkConfig.get(ENDPOINT))); + flinkConfig.get(ENDPOINT), + flinkConfig.get(S5CMD_BATCH_MAX_FILES), + flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes())); } private void configureEnvironment(Map environment) { @@ -263,7 +273,32 @@ public void copyFiles(List requests, ICloseableRegistry closeableRe artefacts.add(s5CmdConfiguration.path); artefacts.addAll(s5CmdConfiguration.args); artefacts.add("run"); - castSpell(convertToSpells(requests), closeableRegistry, artefacts.toArray(new String[0])); + + ArrayList batch = new ArrayList<>(); + long runningSizeBytes = 0L; + long runningSizeFiles = 0L; + for (int i = 0; i < requests.size(); i++) { + CopyRequest request = requests.get(i); + batch.add(request); + runningSizeBytes += request.getSize(); + runningSizeFiles++; + if (runningSizeBytes >= s5CmdConfiguration.maxBatchSizeBytes + || runningSizeFiles >= s5CmdConfiguration.maxBatchSizeFiles + || i == requests.size() - 1) { + LOG.info( + "Copy {} files using s5cmd, total size: {}, args: {}", + requests.size(), + runningSizeBytes, + artefacts); + castSpell( + convertToSpells(batch), + closeableRegistry, + artefacts.toArray(new String[0])); + runningSizeFiles = 0; + runningSizeBytes = 0; + batch.clear(); + } + } } private List convertToSpells(List requests) throws IOException { @@ -282,7 +317,6 @@ private List convertToSpells(List requests) throws IOExcept private void castSpell( List spells, ICloseableRegistry closeableRegistry, String... artefacts) throws IOException { - LOG.info("Casting spell: {}", Arrays.toString(artefacts)); int exitCode = 0; final AtomicReference maybeCloseableRegistryException = new AtomicReference<>(); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java index f7b52c17d29af..05c43f0e78b26 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java @@ -22,15 +22,18 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.ICloseableRegistry; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest; import org.apache.flink.util.Preconditions; import org.apache.commons.io.FileUtils; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nonnull; @@ -39,24 +42,28 @@ import java.io.IOException; import java.net.URI; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY; /** Unit tests for FlinkS3FileSystem. */ class FlinkS3FileSystemTest { - @TempDir public static File temporaryDirectory; @Test @DisabledOnOs({OS.WINDOWS, OS.OTHER}) // OS must support SLEEP command - public void testCopyCommandInterruptible() throws Exception { + public void testCopyCommandInterruptible(@TempDir File temporaryDirectory) throws Exception { File cmdFile = new File(temporaryDirectory, "cmd"); @@ -95,9 +102,10 @@ protected void doRegister( try { fs.copyFiles( Collections.singletonList( - new PathsCopyingFileSystem.CopyTask( + CopyRequest.of( Path.fromLocalFile(new File("")), - Path.fromLocalFile(new File("")))), + Path.fromLocalFile(new File("")), + 100L)), closeableRegistry); } catch (IOException ex) { actualException.set(ex); @@ -112,4 +120,69 @@ protected void doRegister( Assertions.assertThat(actualException.get()) .hasStackTraceContaining("Copy process destroyed by CloseableRegistry."); } + + @ParameterizedTest + @ValueSource(ints = {1, 7, 10, 14, Integer.MAX_VALUE}) + @EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script + public void testCopyCommandBatching(int batchSize, @TempDir File temporaryDirectory) + throws Exception { + final int numFiles = 10; + + File cmdFile = new File(temporaryDirectory, "cmd"); + File inputToCmd = new File(temporaryDirectory, "input"); + Preconditions.checkState(inputToCmd.mkdir()); + + String cmd = + String.format( + "file=$(mktemp %s/s5cmd-input-XXX)\n" + + "while read line; do echo $line >> $file; done < /dev/stdin", + inputToCmd.getAbsolutePath()); + + FileUtils.writeStringToFile(cmdFile, cmd); + Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable."); + + final Configuration conf = new Configuration(); + conf.set(S5CMD_PATH, cmdFile.getAbsolutePath()); + conf.set(S5CMD_EXTRA_ARGS, ""); + conf.set(S5CMD_BATCH_MAX_FILES, batchSize); + conf.set(ACCESS_KEY, "test-access-key"); + conf.set(SECRET_KEY, "test-secret-key"); + conf.set(ENDPOINT, "test-endpoint"); + + TestS3FileSystemFactory factory = new TestS3FileSystemFactory(); + factory.configure(conf); + + FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test")); + List tasks = + IntStream.range(0, numFiles) + .mapToObj( + i -> + CopyRequest.of( + new Path("file:///src-" + i), + new Path("file:///dst-" + i), + 123L)) + .collect(Collectors.toList()); + fs.copyFiles(tasks, ICloseableRegistry.NO_OP); + + File[] files = inputToCmd.listFiles(); + Assertions.assertThat(files).isNotNull(); + Assertions.assertThat(files.length) + .describedAs("Wrong number of s5cmd subcommand batches for input tasks: %s", tasks) + .isEqualTo(numFiles / batchSize + (numFiles % batchSize > 0 ? 1 : 0)); + int totalSubcommands = 0; + for (File file : files) { + List subcommands = FileUtils.readLines(file, StandardCharsets.UTF_8); + Assertions.assertThat(subcommands.size()) + .describedAs( + "Too many s5cmd subcommands issued per batch: %s\n(input files: %s)", + subcommands, tasks) + .isLessThanOrEqualTo(batchSize); + totalSubcommands += subcommands.size(); + } + Assertions.assertThat(totalSubcommands) + .describedAs( + "The total number of issued s5cmd subcommands did not match the number of copy tasks:\n%s,\n%s", + totalSubcommands, tasks) + .isEqualTo(numFiles); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java index eee0d7ba60d9b..d8f45e4bdc1f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java @@ -66,7 +66,9 @@ public List duplicate(List stateHandles) throw new IllegalArgumentException("We can duplicate only FileStateHandles."); } final Path srcPath = ((FileStateHandle) handle).getFilePath(); - requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName()))); + requests.add( + CopyRequest.of( + srcPath, getNewDstPath(srcPath.getName()), handle.getStateSize())); } fs.copyFiles(requests, new CloseableRegistry()); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index 0a5656ccb02f8..4241946f69f23 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -154,6 +154,7 @@ private Collection createDownloadRunnables( if (canCopyPaths(handleAndLocalPath)) { org.apache.flink.core.fs.Path remotePath = handleAndLocalPath.getHandle().maybeGetPath().get(); + long size = handleAndLocalPath.getHandle().getStateSize(); FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri()); filesSystemsFilesToDownload .computeIfAbsent(newFSKey, fsKey -> new ArrayList<>()) @@ -161,7 +162,8 @@ private Collection createDownloadRunnables( CopyRequest.of( remotePath, new org.apache.flink.core.fs.Path( - downloadDestination.toUri()))); + downloadDestination.toUri()), + size)); } else { runnables.add( createDownloadRunnableUsingStreams(