Skip to content

Commit

Permalink
[FLINK-35771][s3] Limit the amount of work per s5cmd call
Browse files Browse the repository at this point in the history
Motivation:
- limit CPU usage by S5cmd to prevent TM overload
- pick up potentially updated credentials for every s5cmd call
  • Loading branch information
rkhachatryan authored and pnowojski committed Aug 26, 2024
1 parent 576b308 commit 0daca7b
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ interface CopyRequest {
/** The path where to duplicate the source file. */
Path getDestination();

/** The size in bytes of the requested file to copy. */
long getSize();

/** A factory method for creating a simple pair of source/destination. */
static CopyRequest of(Path source, Path destination) {
static CopyRequest of(Path source, Path destination, long size) {
return new CopyRequest() {
@Override
public Path getSource() {
Expand All @@ -63,6 +66,11 @@ public Path getDestination() {
return destination;
}

@Override
public long getSize() {
return size;
}

@Override
public String toString() {
return "CopyRequest{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem.S5CmdConfiguration;
Expand Down Expand Up @@ -77,6 +78,18 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
.withDescription(
"Extra arguments to be passed to s5cmd. For example, --no-sign-request for public buckets and -r 10 for 10 retries");

public static final ConfigOption<MemorySize> S5CMD_BATCH_MAX_SIZE =
ConfigOptions.key("s3.s5cmd.batch.max-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(1024))
.withDescription("Maximum size of files to download per one call to s5cmd.");

public static final ConfigOption<Integer> S5CMD_BATCH_MAX_FILES =
ConfigOptions.key("s3.s5cmd.batch.max-files")
.intType()
.defaultValue(100)
.withDescription("Maximum number of files to download per one call to s5cmd");

public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
ConfigOptions.key("s3.upload.min.part.size")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@

import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;
Expand Down Expand Up @@ -112,14 +114,18 @@ public static class S5CmdConfiguration {
@Nullable private final String accessArtifact;
@Nullable private final String secretArtifact;
@Nullable private final String endpoint;
private long maxBatchSizeFiles;
private long maxBatchSizeBytes;

/** All parameters can be empty. */
public S5CmdConfiguration(
String path,
String args,
@Nullable String accessArtifact,
@Nullable String secretArtifact,
@Nullable String endpoint) {
@Nullable String endpoint,
int maxBatchSizeFiles,
long maxBatchSizeBytes) {
if (!path.isEmpty()) {
File s5CmdFile = new File(path);
checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary under [%s]", path);
Expand All @@ -131,6 +137,8 @@ public S5CmdConfiguration(
this.accessArtifact = accessArtifact;
this.secretArtifact = secretArtifact;
this.endpoint = endpoint;
this.maxBatchSizeFiles = maxBatchSizeFiles;
this.maxBatchSizeBytes = maxBatchSizeBytes;
}

public static Optional<S5CmdConfiguration> of(Configuration flinkConfig) {
Expand All @@ -143,7 +151,9 @@ public static Optional<S5CmdConfiguration> of(Configuration flinkConfig) {
flinkConfig.getString(S5CMD_EXTRA_ARGS),
flinkConfig.get(ACCESS_KEY),
flinkConfig.get(SECRET_KEY),
flinkConfig.get(ENDPOINT)));
flinkConfig.get(ENDPOINT),
flinkConfig.get(S5CMD_BATCH_MAX_FILES),
flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes()));
}

private void configureEnvironment(Map<String, String> environment) {
Expand Down Expand Up @@ -263,7 +273,32 @@ public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRe
artefacts.add(s5CmdConfiguration.path);
artefacts.addAll(s5CmdConfiguration.args);
artefacts.add("run");
castSpell(convertToSpells(requests), closeableRegistry, artefacts.toArray(new String[0]));

ArrayList<CopyRequest> batch = new ArrayList<>();
long runningSizeBytes = 0L;
long runningSizeFiles = 0L;
for (int i = 0; i < requests.size(); i++) {
CopyRequest request = requests.get(i);
batch.add(request);
runningSizeBytes += request.getSize();
runningSizeFiles++;
if (runningSizeBytes >= s5CmdConfiguration.maxBatchSizeBytes
|| runningSizeFiles >= s5CmdConfiguration.maxBatchSizeFiles
|| i == requests.size() - 1) {
LOG.info(
"Copy {} files using s5cmd, total size: {}, args: {}",
requests.size(),
runningSizeBytes,
artefacts);
castSpell(
convertToSpells(batch),
closeableRegistry,
artefacts.toArray(new String[0]));
runningSizeFiles = 0;
runningSizeBytes = 0;
batch.clear();
}
}
}

private List<String> convertToSpells(List<CopyRequest> requests) throws IOException {
Expand All @@ -282,7 +317,6 @@ private List<String> convertToSpells(List<CopyRequest> requests) throws IOExcept
private void castSpell(
List<String> spells, ICloseableRegistry closeableRegistry, String... artefacts)
throws IOException {
LOG.info("Casting spell: {}", Arrays.toString(artefacts));
int exitCode = 0;
final AtomicReference<IOException> maybeCloseableRegistryException =
new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
import org.apache.flink.util.Preconditions;

import org.apache.commons.io.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import javax.annotation.Nonnull;

Expand All @@ -39,24 +42,28 @@
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;

/** Unit tests for FlinkS3FileSystem. */
class FlinkS3FileSystemTest {
@TempDir public static File temporaryDirectory;

@Test
@DisabledOnOs({OS.WINDOWS, OS.OTHER}) // OS must support SLEEP command
public void testCopyCommandInterruptible() throws Exception {
public void testCopyCommandInterruptible(@TempDir File temporaryDirectory) throws Exception {

File cmdFile = new File(temporaryDirectory, "cmd");

Expand Down Expand Up @@ -95,9 +102,10 @@ protected void doRegister(
try {
fs.copyFiles(
Collections.singletonList(
new PathsCopyingFileSystem.CopyTask(
CopyRequest.of(
Path.fromLocalFile(new File("")),
Path.fromLocalFile(new File("")))),
Path.fromLocalFile(new File("")),
100L)),
closeableRegistry);
} catch (IOException ex) {
actualException.set(ex);
Expand All @@ -112,4 +120,69 @@ protected void doRegister(
Assertions.assertThat(actualException.get())
.hasStackTraceContaining("Copy process destroyed by CloseableRegistry.");
}

@ParameterizedTest
@ValueSource(ints = {1, 7, 10, 14, Integer.MAX_VALUE})
@EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script
public void testCopyCommandBatching(int batchSize, @TempDir File temporaryDirectory)
throws Exception {
final int numFiles = 10;

File cmdFile = new File(temporaryDirectory, "cmd");
File inputToCmd = new File(temporaryDirectory, "input");
Preconditions.checkState(inputToCmd.mkdir());

String cmd =
String.format(
"file=$(mktemp %s/s5cmd-input-XXX)\n"
+ "while read line; do echo $line >> $file; done < /dev/stdin",
inputToCmd.getAbsolutePath());

FileUtils.writeStringToFile(cmdFile, cmd);
Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable.");

final Configuration conf = new Configuration();
conf.set(S5CMD_PATH, cmdFile.getAbsolutePath());
conf.set(S5CMD_EXTRA_ARGS, "");
conf.set(S5CMD_BATCH_MAX_FILES, batchSize);
conf.set(ACCESS_KEY, "test-access-key");
conf.set(SECRET_KEY, "test-secret-key");
conf.set(ENDPOINT, "test-endpoint");

TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
factory.configure(conf);

FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
List<CopyRequest> tasks =
IntStream.range(0, numFiles)
.mapToObj(
i ->
CopyRequest.of(
new Path("file:///src-" + i),
new Path("file:///dst-" + i),
123L))
.collect(Collectors.toList());
fs.copyFiles(tasks, ICloseableRegistry.NO_OP);

File[] files = inputToCmd.listFiles();
Assertions.assertThat(files).isNotNull();
Assertions.assertThat(files.length)
.describedAs("Wrong number of s5cmd subcommand batches for input tasks: %s", tasks)
.isEqualTo(numFiles / batchSize + (numFiles % batchSize > 0 ? 1 : 0));
int totalSubcommands = 0;
for (File file : files) {
List<String> subcommands = FileUtils.readLines(file, StandardCharsets.UTF_8);
Assertions.assertThat(subcommands.size())
.describedAs(
"Too many s5cmd subcommands issued per batch: %s\n(input files: %s)",
subcommands, tasks)
.isLessThanOrEqualTo(batchSize);
totalSubcommands += subcommands.size();
}
Assertions.assertThat(totalSubcommands)
.describedAs(
"The total number of issued s5cmd subcommands did not match the number of copy tasks:\n%s,\n%s",
totalSubcommands, tasks)
.isEqualTo(numFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles)
throw new IllegalArgumentException("We can duplicate only FileStateHandles.");
}
final Path srcPath = ((FileStateHandle) handle).getFilePath();
requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
requests.add(
CopyRequest.of(
srcPath, getNewDstPath(srcPath.getName()), handle.getStateSize()));
}
fs.copyFiles(requests, new CloseableRegistry());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,16 @@ private Collection<Runnable> createDownloadRunnables(
if (canCopyPaths(handleAndLocalPath)) {
org.apache.flink.core.fs.Path remotePath =
handleAndLocalPath.getHandle().maybeGetPath().get();
long size = handleAndLocalPath.getHandle().getStateSize();
FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri());
filesSystemsFilesToDownload
.computeIfAbsent(newFSKey, fsKey -> new ArrayList<>())
.add(
CopyRequest.of(
remotePath,
new org.apache.flink.core.fs.Path(
downloadDestination.toUri())));
downloadDestination.toUri()),
size));
} else {
runnables.add(
createDownloadRunnableUsingStreams(
Expand Down

0 comments on commit 0daca7b

Please sign in to comment.