Skip to content

Commit

Permalink
Core: Support rewrite-all flag in BinPackStrategy (apache#4433)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajarshisarkar authored Apr 4, 2022
1 parent 7147282 commit e779a37
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 61 deletions.
51 changes: 36 additions & 15 deletions core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.BinPacking.ListPacker;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A rewrite strategy for data files which determines which files to rewrite
Expand All @@ -46,6 +48,8 @@
*/
public abstract class BinPackStrategy implements RewriteStrategy {

private static final Logger LOG = LoggerFactory.getLogger(BinPackStrategy.class);

/**
* The minimum number of files that need to be in a file group for it to be considered for
* compaction if the total size of that group is less than the {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
Expand Down Expand Up @@ -89,12 +93,20 @@ public abstract class BinPackStrategy implements RewriteStrategy {

static final long SPLIT_OVERHEAD = 1024 * 5;

/**
* Rewrites all files, regardless of their size. Defaults to false, rewriting only mis-sized
* files;
*/
public static final String REWRITE_ALL = "rewrite-all";
public static final boolean REWRITE_ALL_DEFAULT = false;

private int minInputFiles;
private int deleteFileThreshold;
private long minFileSize;
private long maxFileSize;
private long targetFileSize;
private long maxGroupSize;
private boolean rewriteAll;

@Override
public String name() {
Expand All @@ -107,7 +119,8 @@ public Set<String> validOptions() {
MIN_INPUT_FILES,
DELETE_FILE_THRESHOLD,
MIN_FILE_SIZE_BYTES,
MAX_FILE_SIZE_BYTES
MAX_FILE_SIZE_BYTES,
REWRITE_ALL
);
}

Expand Down Expand Up @@ -140,28 +153,40 @@ public RewriteStrategy options(Map<String, String> options) {
DELETE_FILE_THRESHOLD,
DELETE_FILE_THRESHOLD_DEFAULT);

rewriteAll = PropertyUtil.propertyAsBoolean(options,
REWRITE_ALL,
REWRITE_ALL_DEFAULT);

validateOptions();
return this;
}

@Override
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
return FluentIterable.from(dataFiles)
.filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize ||
taskHasTooManyDeletes(scanTask));
if (rewriteAll) {
LOG.info("Table {} set to rewrite all data files", table().name());
return dataFiles;
} else {
return FluentIterable.from(dataFiles)
.filter(scanTask -> scanTask.length() < minFileSize || scanTask.length() > maxFileSize ||
taskHasTooManyDeletes(scanTask));
}
}

@Override
public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> dataFiles) {
ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false);
List<List<FileScanTask>> potentialGroups = packer.pack(dataFiles, FileScanTask::length);

return potentialGroups.stream().filter(group ->
(group.size() >= minInputFiles && group.size() > 1) ||
(sizeOfInputFiles(group) > targetFileSize && group.size() > 1) ||
sizeOfInputFiles(group) > maxFileSize ||
group.stream().anyMatch(this::taskHasTooManyDeletes)
).collect(Collectors.toList());
if (rewriteAll) {
return potentialGroups;
} else {
return potentialGroups.stream().filter(group ->
(group.size() >= minInputFiles && group.size() > 1) ||
(sizeOfInputFiles(group) > targetFileSize && group.size() > 1) ||
sizeOfInputFiles(group) > maxFileSize ||
group.stream().anyMatch(this::taskHasTooManyDeletes)
).collect(Collectors.toList());
}
}

protected long targetFileSize() {
Expand Down Expand Up @@ -217,10 +242,6 @@ protected long inputFileSize(List<FileScanTask> fileToRewrite) {
return fileToRewrite.stream().mapToLong(FileScanTask::length).sum();
}

protected long maxGroupSize() {
return maxGroupSize;
}

/**
* Estimates a larger max target file size than our target size used in task creation to avoid
* tasks which are predicted to have a certain size, but exceed that target size when serde is complete creating
Expand Down
46 changes: 0 additions & 46 deletions core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,11 @@

package org.apache.iceberg.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.BinPacking.ListPacker;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out
Expand All @@ -46,21 +39,7 @@
* In the future other algorithms for determining files to rewrite will be provided.
*/
public abstract class SortStrategy extends BinPackStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SortStrategy.class);

/**
* Rewrites all files, regardless of their size. Defaults to false, rewriting only mis-sized
* files;
*/
public static final String REWRITE_ALL = "rewrite-all";
public static final boolean REWRITE_ALL_DEFAULT = false;


private static final Set<String> validOptions = ImmutableSet.of(
REWRITE_ALL
);

private boolean rewriteAll;
private SortOrder sortOrder;

/**
Expand All @@ -86,18 +65,13 @@ public String name() {
public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.addAll(validOptions)
.build();
}

@Override
public RewriteStrategy options(Map<String, String> options) {
super.options(options); // Also checks validity of BinPack options

rewriteAll = PropertyUtil.propertyAsBoolean(options,
REWRITE_ALL,
REWRITE_ALL_DEFAULT);

if (sortOrder == null) {
sortOrder = table().sortOrder();
}
Expand All @@ -106,26 +80,6 @@ public RewriteStrategy options(Map<String, String> options) {
return this;
}

@Override
public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFiles) {
if (rewriteAll) {
LOG.info("Sort Strategy for table {} set to rewrite all data files", table().name());
return dataFiles;
} else {
return super.selectFilesToRewrite(dataFiles);
}
}

@Override
public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> dataFiles) {
if (rewriteAll) {
ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxGroupSize(), 1, false);
return packer.pack(dataFiles, FileScanTask::length);
} else {
return super.planFileGroups(dataFiles);
}
}

protected void validateOptions() {
Preconditions.checkArgument(!sortOrder.isUnsorted(),
"Can't use %s when there is no sort order, either define table %s's sort order or set sort" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,29 @@ public void testInvalidOptions() {
RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(-5)));
});
}

@Test
public void testRewriteAllSelectFilesToRewrite() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.REWRITE_ALL, "true"
));

Iterable<FileScanTask> testFiles = filesOfSize(500, 500, 480, 480, 560, 520);
Iterable<FileScanTask> expectedFiles = filesOfSize(500, 500, 480, 480, 560, 520);
Iterable<FileScanTask> filtered = ImmutableList.copyOf(strategy.selectFilesToRewrite(testFiles));
Assert.assertEquals("Should rewrite all files", expectedFiles, filtered);
}

@Test
public void testRewriteAllPlanFileGroups() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5),
BinPackStrategy.REWRITE_ALL, "true"
));

Iterable<FileScanTask> testFiles = filesOfSize(1, 1, 1, 1);
Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);

Assert.assertEquals("Should plan 1 group to rewrite all files", 1, Iterables.size(grouped));
}
}

0 comments on commit e779a37

Please sign in to comment.