Skip to content

Commit

Permalink
Core: Ignore split offsets array when split offset is past file length (
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar authored Oct 28, 2023
1 parent d7f46b4 commit 385a6bd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 9 deletions.
24 changes: 15 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,21 +460,27 @@ public ByteBuffer keyMetadata() {

@Override
public List<Long> splitOffsets() {
if (splitOffsets == null || splitOffsets.length == 0) {
return null;
if (hasWellDefinedOffsets()) {
return ArrayUtil.toUnmodifiableLongList(splitOffsets);
}

// If the last split offset is past the file size this means the split offsets are corrupted and
// should not be used
if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) {
return null;
return null;
}

long[] splitOffsetArray() {
if (hasWellDefinedOffsets()) {
return splitOffsets;
}

return ArrayUtil.toUnmodifiableLongList(splitOffsets);
return null;
}

long[] splitOffsetArray() {
return splitOffsets;
private boolean hasWellDefinedOffsets() {
// If the last split offset is past the file size this means the split offsets are corrupted and
// should not be used
return splitOffsets != null
&& splitOffsets.length != 0
&& splitOffsets[splitOffsets.length - 1] < fileSizeInBytes;
}

@Override
Expand Down
46 changes: 46 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,26 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
import org.apache.iceberg.MockFileScanTask;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SplittableScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -126,6 +133,45 @@ public void testTaskGroupPlanning() {
assertThat(taskGroups).as("Must have 3 task groups").hasSize(3);
}

@Test
public void testTaskGroupPlanningCorruptedOffset() {
DataFile dataFile =
DataFiles.builder(TableTestBase.SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.withSplitOffsets(
ImmutableList.of(2L, 12L)) // the last offset is beyond the end of the file
.build();

ResidualEvaluator residualEvaluator =
ResidualEvaluator.of(TableTestBase.SPEC, Expressions.equal("id", 1), false);

BaseFileScanTask baseFileScanTask =
new BaseFileScanTask(
dataFile,
null,
SchemaParser.toJson(TableTestBase.SCHEMA),
PartitionSpecParser.toJson(TableTestBase.SPEC),
residualEvaluator);

List<BaseFileScanTask> baseFileScanTasks = ImmutableList.of(baseFileScanTask);

int taskCount = 0;
for (ScanTaskGroup<BaseFileScanTask> task :
TableScanUtil.planTaskGroups(CloseableIterable.withNoopClose(baseFileScanTasks), 1, 1, 0)) {
for (FileScanTask fileScanTask : task.tasks()) {
DataFile taskDataFile = fileScanTask.file();
Assertions.assertThat(taskDataFile.splitOffsets()).isNull();
taskCount++;
}
}

// 10 tasks since the split offsets are ignored and there are 1 byte splits for a 10 byte file
Assertions.assertThat(taskCount).isEqualTo(10);
}

@Test
public void testTaskMerging() {
List<ParentTask> tasks =
Expand Down

0 comments on commit 385a6bd

Please sign in to comment.