Skip to content

Commit

Permalink
Core: Add a util method to combine tasks by partition (apache#2276)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao authored Nov 14, 2022
1 parent 2d1e100 commit 33217ab
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 7 deletions.
12 changes: 5 additions & 7 deletions api/src/main/java/org/apache/iceberg/ContentScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@
*
* @param <F> the Java class of the content file
*/
public interface ContentScanTask<F extends ContentFile<F>> extends ScanTask {
public interface ContentScanTask<F extends ContentFile<F>> extends ScanTask, PartitionScanTask {
/**
* The {@link ContentFile file} to scan.
*
* @return the file to scan
*/
F file();

/**
* The {@link PartitionSpec spec} used to store this file.
*
* @return the partition spec from this file's manifest
*/
PartitionSpec spec();
@Override
default StructLike partition() {
return file().partition();
}

/**
* The starting position of this scan range in the file.
Expand Down
28 changes: 28 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

/** A scan task for data within a particular partition */
public interface PartitionScanTask extends ScanTask {
/** Returns the spec of the partition for this scan task */
PartitionSpec spec();

/** Returns the value of the partition for this scan task */
StructLike partition();
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/StructProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public static StructProjection createAllowMissing(
private final StructProjection[] nestedProjections;
private StructLike struct;

private StructProjection(
StructType type, int[] positionMap, StructProjection[] nestedProjections) {
this.type = type;
this.positionMap = positionMap;
this.nestedProjections = nestedProjections;
}

private StructProjection(StructType structType, StructType projection) {
this(structType, projection, false);
}
Expand Down Expand Up @@ -171,6 +178,10 @@ public StructProjection wrap(StructLike newStruct) {
return this;
}

public StructProjection copyFor(StructLike newStruct) {
return new StructProjection(type, positionMap, nestedProjections).wrap(newStruct);
}

@Override
public int size() {
return type.fields().size();
Expand Down
70 changes: 70 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,30 @@
package org.apache.iceberg.util;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.BaseScanTaskGroup;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.SplittableScanTask;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;

public class TableScanUtil {

Expand Down Expand Up @@ -128,6 +136,68 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG
combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
}

@SuppressWarnings("unchecked")
public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups(
List<T> tasks,
long splitSize,
int lookback,
long openFileCost,
Types.StructType projectedPartitionType) {

Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);
Preconditions.checkArgument(
lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback);
Preconditions.checkArgument(
openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost);

Function<T, Long> weightFunc =
task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);

Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();

// Group tasks by their partition values
StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);

for (T task : tasks) {
PartitionSpec spec = task.spec();
StructProjection projectedStruct =
projectionsBySpec.computeIfAbsent(
spec.specId(),
specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
List<T> taskList =
tasksByPartition.computeIfAbsent(
projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
if (task instanceof SplittableScanTask<?>) {
((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
} else {
taskList.add(task);
}
}

// Now apply task combining within each partition
return FluentIterable.from(tasksByPartition.values())
.transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
.toList();
}

private static <T extends ScanTask> Iterable<ScanTaskGroup<T>> toTaskGroupIterable(
Iterable<T> tasks, long splitSize, int lookback, Function<T, Long> weightFunc) {
return Iterables.transform(
new BinPacking.PackingIterable<>(tasks, splitSize, lookback, weightFunc, true),
combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
}

private static <T extends ScanTask> Stream<ScanTaskGroup<T>> toTaskGroupStream(
Iterable<T> tasks, long splitSize, int lookback, Function<T, Long> weightFunc) {
CloseableIterable<ScanTaskGroup<T>> taskGroups =
CloseableIterable.transform(
CloseableIterable.combine(
new BinPacking.PackingIterable<>(tasks, splitSize, lookback, weightFunc, true),
CloseableIterable.withNoopClose(tasks)),
combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
return StreamSupport.stream(taskGroups.spliterator(), false);
}

@SuppressWarnings("unchecked")
public static <T extends ScanTask> List<T> mergeTasks(List<T> tasks) {
List<T> mergedTasks = Lists.newArrayList();
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/org/apache/iceberg/MockFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public MockFileScanTask(DataFile file, DeleteFile[] deleteFiles) {
this.length = file.fileSizeInBytes();
}

public MockFileScanTask(DataFile file, String schemaString, String specString) {
super(file, null, schemaString, specString, null);
this.length = file.fileSizeInBytes();
}

public static MockFileScanTask mockTask(long length, int sortOrderId) {
DataFile mockFile = Mockito.mock(DataFile.class);
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
Expand Down
141 changes: 141 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
import org.apache.iceberg.MockFileScanTask;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SplittableScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -136,6 +142,141 @@ public void testTaskMerging() {
Assert.assertEquals("Appropriate tasks should be merged", 3, mergedTasks.size());
}

private static final Schema TEST_SCHEMA =
new Schema(
Types.NestedField.optional(1, "c1", Types.IntegerType.get()),
Types.NestedField.optional(2, "c2", Types.StringType.get()),
Types.NestedField.optional(3, "c3", Types.StringType.get()),
Types.NestedField.optional(4, "c4", Types.StringType.get()));

private static final PartitionSpec SPEC1 =
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c2").build();

private static final PartitionSpec SPEC2 =
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c3").identity("c2").build();

private static final StructLike PARTITION1 = new TestStructLike(100, "a");
private static final StructLike PARTITION2 = new TestStructLike(200, "b");

@Test
public void testTaskGroupPlanningByPartition() {
// When all files belong to the same partition, we should combine them together as long as the
// total file size is <= split size
List<PartitionScanTask> tasks =
ImmutableList.of(
taskWithPartition(SPEC1, PARTITION1, 64),
taskWithPartition(SPEC1, PARTITION1, 128),
taskWithPartition(SPEC1, PARTITION1, 64),
taskWithPartition(SPEC1, PARTITION1, 128));

int count = 0;
for (ScanTaskGroup<PartitionScanTask> task :
TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) {
Assert.assertEquals(4, task.filesCount());
Assert.assertEquals(64 + 128 + 64 + 128, task.sizeBytes());
count += 1;
}
Assert.assertEquals(1, count);

// We have 2 files from partition 1 and 2 files from partition 2, so they should be combined
// separately
tasks =
ImmutableList.of(
taskWithPartition(SPEC1, PARTITION1, 64),
taskWithPartition(SPEC1, PARTITION1, 128),
taskWithPartition(SPEC1, PARTITION2, 64),
taskWithPartition(SPEC1, PARTITION2, 128));

count = 0;
for (ScanTaskGroup<PartitionScanTask> task :
TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) {
Assert.assertEquals(2, task.filesCount());
Assert.assertEquals(64 + 128, task.sizeBytes());
count += 1;
}
Assert.assertEquals(2, count);

// Similar to the case above, but now files have different partition specs
tasks =
ImmutableList.of(
taskWithPartition(SPEC1, PARTITION1, 64),
taskWithPartition(SPEC2, PARTITION1, 128),
taskWithPartition(SPEC1, PARTITION2, 64),
taskWithPartition(SPEC2, PARTITION2, 128));

count = 0;
for (ScanTaskGroup<PartitionScanTask> task :
TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) {
Assert.assertEquals(2, task.filesCount());
Assert.assertEquals(64 + 128, task.sizeBytes());
count += 1;
}
Assert.assertEquals(2, count);

// Combining within partitions should also respect split size. In this case, the split size
// is equal to the file size, so each partition will have 2 tasks.
tasks =
ImmutableList.of(
taskWithPartition(SPEC1, PARTITION1, 128),
taskWithPartition(SPEC2, PARTITION1, 128),
taskWithPartition(SPEC1, PARTITION2, 128),
taskWithPartition(SPEC2, PARTITION2, 128));

count = 0;
for (ScanTaskGroup<PartitionScanTask> task :
TableScanUtil.planTaskGroups(tasks, 128, 10, 4, SPEC1.partitionType())) {
Assert.assertEquals(1, task.filesCount());
Assert.assertEquals(128, task.sizeBytes());
count += 1;
}
Assert.assertEquals(4, count);

// The following should throw exception since `SPEC2` is not an intersection of partition specs
// across all tasks.
List<PartitionScanTask> tasks2 =
ImmutableList.of(
taskWithPartition(SPEC1, PARTITION1, 128), taskWithPartition(SPEC2, PARTITION2, 128));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
"Cannot find field",
() -> TableScanUtil.planTaskGroups(tasks2, 128, 10, 4, SPEC2.partitionType()));
}

private PartitionScanTask taskWithPartition(
PartitionSpec spec, StructLike partition, long sizeBytes) {
PartitionScanTask task = Mockito.mock(PartitionScanTask.class);
Mockito.when(task.spec()).thenReturn(spec);
Mockito.when(task.partition()).thenReturn(partition);
Mockito.when(task.sizeBytes()).thenReturn(sizeBytes);
Mockito.when(task.filesCount()).thenReturn(1);
return task;
}

private static class TestStructLike implements StructLike {
private final Object[] values;

TestStructLike(Object... values) {
this.values = values;
}

@Override
public int size() {
return values.length;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(values[pos]);
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("set is not supported");
}
}

private interface ParentTask extends ScanTask {}

private static class ChildTask1
Expand Down

0 comments on commit 33217ab

Please sign in to comment.