Skip to content

Commit

Permalink
Spark: Spark3 Sort Compaction Implementation (apache#2829)
Browse files Browse the repository at this point in the history
* Spark: Adds Spark3 Sort Based Compaction

Implements Spark3 Sort Based compaction. Uses similar logic to the
Spark3BinPack Strategy but instead of doing a direct read then write,
issues a read, sort, and then write.
  • Loading branch information
RussellSpitzer authored Oct 18, 2021
1 parent 711c1eb commit 61fe9b1
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 53 deletions.
18 changes: 18 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;

Expand Down Expand Up @@ -84,6 +85,23 @@ default RewriteDataFiles binPack() {
return this;
}

/**
* Choose SORT as a strategy for this rewrite operation using the table's sortOrder
* @return this for method chaining
*/
default RewriteDataFiles sort() {
throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
}

/**
* Choose SORT as a strategy for this rewrite operation and manually specify the sortOrder to use
* @param sortOrder user defined sortOrder
* @return this for method chaining
*/
default RewriteDataFiles sort(SortOrder sortOrder) {
throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
}

/**
* A user provided filter for determining which files will be considered by the rewrite strategy. This will be used
* in addition to whatever rules the rewrite strategy generates. For example this would be used for providing a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ protected long inputFileSize(List<FileScanTask> fileToRewrite) {
return fileToRewrite.stream().mapToLong(FileScanTask::length).sum();
}

protected long maxGroupSize() {
return maxGroupSize;
}

/**
* Estimates a larger max target file size than our target size used in task creation to avoid
* tasks which are predicted to have a certain size, but exceed that target size when serde is complete creating
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.iceberg.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.BinPacking.ListPacker;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +116,16 @@ public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> dataFi
}
}

@Override
public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> dataFiles) {
if (rewriteAll) {
ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxGroupSize(), 1, false);
return packer.pack(dataFiles, FileScanTask::length);
} else {
return super.planFileGroups(dataFiles);
}
}

protected void validateOptions() {
Preconditions.checkArgument(!sortOrder.isUnsorted(),
"Can't use %s when there is no sort order, either define table %s's sort order or set sort" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static SortOrder buildSortOrder(Table table) {
return buildSortOrder(table.schema(), table.spec(), table.sortOrder());
}

static SortOrder buildSortOrder(Schema schema, PartitionSpec spec, SortOrder sortOrder) {
public static SortOrder buildSortOrder(Schema schema, PartitionSpec spec, SortOrder sortOrder) {
if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
return SortOrder.unsorted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo;
Expand All @@ -43,6 +44,8 @@
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.actions.RewriteStrategy;
import org.apache.iceberg.actions.SortStrategy;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -52,14 +55,16 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
Expand All @@ -83,12 +88,11 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
private RewriteStrategy strategy;
private RewriteStrategy strategy = null;

protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.strategy = binPackStrategy();
}

protected Table table() {
Expand All @@ -100,12 +104,35 @@ protected Table table() {
*/
protected abstract BinPackStrategy binPackStrategy();

/**
* The framework specific {@link SortStrategy}
*/
protected abstract SortStrategy sortStrategy();

@Override
public RewriteDataFiles binPack() {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to binpack, it has already been set", this.strategy);
this.strategy = binPackStrategy();
return this;
}

@Override
public RewriteDataFiles sort(SortOrder sortOrder) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
this.strategy = sortStrategy().sortOrder(sortOrder);
return this;
}

@Override
public RewriteDataFiles sort() {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
this.strategy = sortStrategy();
return this;
}

@Override
public RewriteDataFiles filter(Expression expression) {
filter = Expressions.and(filter, expression);
Expand All @@ -120,6 +147,11 @@ public RewriteDataFiles.Result execute() {

long startingSnapshotId = table.currentSnapshot().snapshotId();

// Default to BinPack if no strategy selected
if (this.strategy == null) {
this.strategy = binPackStrategy();
}

validateAndInitOptions();
strategy = strategy.options(options());

Expand Down Expand Up @@ -149,10 +181,27 @@ private Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSn
.planFiles();

try {
Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
.collect(Collectors.groupingBy(task -> task.file().partition()));
StructType partitionType = table.spec().partitionType();
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

fileScanTasks.forEach(task -> {
// If a task uses an incompatible partition spec the data inside could contain values which
// belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
// grouping them together helps to minimize new files made.
StructLike taskPartition = task.file().specId() == table.spec().specId() ?
task.file().partition() : emptyStruct;

List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}

files.add(task);
filesByPartition.put(taskPartition, files);
});

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = Maps.newHashMap();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition = StructLikeMap.create(partitionType);

filesByPartition.forEach((partition, tasks) -> {
Iterable<FileScanTask> filtered = strategy.selectFilesToRewrite(tasks);
Expand Down
Loading

0 comments on commit 61fe9b1

Please sign in to comment.