From 9ab94f87de036c9cd91cf8353906a576b4a516ff Mon Sep 17 00:00:00 2001 From: Steven Zhen Wu Date: Tue, 31 May 2022 18:06:31 -0700 Subject: [PATCH] Core: Follow-up to IncrementalAppendScan implementation (#4886) --- .../iceberg/BaseIncrementalAppendScan.java | 75 +++++++++++-------- .../java/org/apache/iceberg/BaseScan.java | 2 +- .../org/apache/iceberg/BaseTableScan.java | 5 -- .../org/apache/iceberg/DataTableScan.java | 11 +-- .../iceberg/IncrementalDataTableScan.java | 2 +- .../org/apache/iceberg/TableScanContext.java | 4 +- .../org/apache/iceberg/util/SnapshotUtil.java | 2 +- .../iceberg/TestIncrementalDataTableScan.java | 1 + 8 files changed, 50 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java index 33bbd7d8f1d3..c86682e86578 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java @@ -59,7 +59,7 @@ public IncrementalAppendScan fromSnapshotInclusive(long fromSnapshotId) { public IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId) { // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied. // as fromSnapshotId could be matched to a parent snapshot that is already expired - return newRefinedScan(tableOps(), table(), schema(), context().fromSnapshotId(fromSnapshotId)); + return newRefinedScan(tableOps(), table(), schema(), context().fromSnapshotIdExclusive(fromSnapshotId)); } @Override @@ -71,46 +71,23 @@ public IncrementalAppendScan toSnapshot(long toSnapshotId) { @Override public CloseableIterable planFiles() { - long toSnapshotIdInclusive; - if (context().toSnapshotId() != null) { - toSnapshotIdInclusive = context().toSnapshotId(); - } else if (table().currentSnapshot() != null) { - toSnapshotIdInclusive = table().currentSnapshot().snapshotId(); - } else if (context().fromSnapshotId() != null) { - throw new IllegalArgumentException( - "Invalid config: end snapshot is not set, start snapshot is set, and table has no current snapshot"); - } else { - // It is an empty table (no current snapshot). Both from and to snapshots aren't set either. - // In this case, listener notification is also skipped + Long fromSnapshotId = context().fromSnapshotId(); + Long toSnapshotId = context().toSnapshotId(); + if (fromSnapshotId == null && toSnapshotId == null && table().currentSnapshot() == null) { + // If it is an empty table (no current snapshot) and both from and to snapshots aren't set either, + // simply return an empty iterable. In this case, listener notification is also skipped. return CloseableIterable.empty(); } + long toSnapshotIdInclusive = toSnapshotIdInclusive(); // fromSnapshotIdExclusive can be null. appendsBetween handles null fromSnapshotIdExclusive properly // by finding the oldest ancestor of end snapshot. - Long fromSnapshotIdExclusive = context().fromSnapshotId(); - if (context().fromSnapshotId() != null) { - if (context().fromSnapshotInclusive()) { - // validate the fromSnapshotId is an ancestor of toSnapshotId - Preconditions.checkArgument( - SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive, context().fromSnapshotId()), - "Starting snapshot (inclusive) %s is not an ancestor of end snapshot %s", - context().fromSnapshotId(), toSnapshotIdInclusive); - // for inclusive behavior fromSnapshotIdExclusive is set to the parent snapshot id, which can be null. - fromSnapshotIdExclusive = table().snapshot(context().fromSnapshotId()).parentId(); - } else { - // validate the parent snapshot id an ancestor of toSnapshotId - Preconditions.checkArgument( - SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, context().fromSnapshotId()), - "Starting snapshot (exclusive) %s is not a parent ancestor of end snapshot %s", - context().fromSnapshotId(), toSnapshotIdInclusive); - } - } - + Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(fromSnapshotId, toSnapshotIdInclusive); if (fromSnapshotIdExclusive != null) { Listeners.notifyAll(new IncrementalScanEvent(table().name(), fromSnapshotIdExclusive, toSnapshotIdInclusive, context().rowFilter(), table().schema(), false)); } else { - Snapshot oldestAncestorSnapshot = SnapshotUtil.oldestAncestor(toSnapshotIdInclusive, table()::snapshot); + Snapshot oldestAncestorSnapshot = SnapshotUtil.oldestAncestorOf(toSnapshotIdInclusive, table()::snapshot); Listeners.notifyAll(new IncrementalScanEvent(table().name(), oldestAncestorSnapshot.snapshotId(), toSnapshotIdInclusive, context().rowFilter(), table().schema(), true)); } @@ -131,6 +108,40 @@ public CloseableIterable planTasks() { return TableScanUtil.planTasks(splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost()); } + private Long fromSnapshotIdExclusive(Long fromSnapshotId, long toSnapshotIdInclusive) { + if (fromSnapshotId != null) { + if (context().fromSnapshotInclusive()) { + // validate the fromSnapshotId is an ancestor of toSnapshotId + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId), + "Starting snapshot (inclusive) %s is not an ancestor of end snapshot %s", + fromSnapshotId, toSnapshotIdInclusive); + // for inclusive behavior fromSnapshotIdExclusive is set to the parent snapshot id, which can be null. + return table().snapshot(fromSnapshotId).parentId(); + } else { + // validate the parent snapshot id an ancestor of toSnapshotId + Preconditions.checkArgument( + SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId), + "Starting snapshot (exclusive) %s is not a parent ancestor of end snapshot %s", + fromSnapshotId, toSnapshotIdInclusive); + return fromSnapshotId; + } + } else { + return null; + } + } + + private long toSnapshotIdInclusive() { + if (context().toSnapshotId() != null) { + return context().toSnapshotId(); + } else { + Snapshot currentSnapshot = table().currentSnapshot(); + Preconditions.checkArgument(currentSnapshot != null, + "Invalid config: end snapshot is not set and table has no current snapshot"); + return currentSnapshot.snapshotId(); + } + } + private CloseableIterable appendFilesFromSnapshots(List snapshots) { Set snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId)); Set manifests = FluentIterable diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 9f9bd158b2e1..44e8d38eb533 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -134,7 +134,7 @@ public long splitOpenFileCost() { /** * Resolve the schema to be projected lazily. * - * if there are selected columns from scan context, selected columns are projected to the table schema. + * If there are selected columns from scan context, selected columns are projected to the table schema. * Otherwise, projected schema from scan context shall be returned. * * @param context scan context diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 1ce55cec85bd..68862d466c7e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -19,7 +19,6 @@ package org.apache.iceberg; -import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.iceberg.events.Listeners; @@ -61,10 +60,6 @@ protected boolean shouldIgnoreResiduals() { return context().ignoreResiduals(); } - protected Collection selectedColumns() { - return context().selectedColumns(); - } - protected ExecutorService planExecutor() { return context().planExecutor(); } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 98d7f11c2783..b4d51079671f 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -23,7 +23,6 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; public class DataTableScan extends BaseTableScan { @@ -51,7 +50,7 @@ public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { Preconditions.checkState(snapshotId() == null, "Cannot enable incremental scan, scan-snapshot set to id=%s", snapshotId()); return new IncrementalDataTableScan(tableOps(), table(), schema(), - context().fromSnapshotId(fromSnapshotId).toSnapshotId(toSnapshotId)); + context().fromSnapshotIdExclusive(fromSnapshotId).toSnapshotId(toSnapshotId)); } @Override @@ -99,12 +98,4 @@ public CloseableIterable doPlanFiles() { return manifestGroup.planFiles(); } - - @Override - public long targetSplitSize() { - long tableValue = tableOps().current().propertyAsLong( - TableProperties.SPLIT_SIZE, - TableProperties.SPLIT_SIZE_DEFAULT); - return PropertyUtil.propertyAsLong(options(), TableProperties.SPLIT_SIZE, tableValue); - } } diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 568bc1034929..034d217b9e34 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -56,7 +56,7 @@ public TableScan useSnapshot(long scanSnapshotId) { public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { validateSnapshotIdsRefinement(fromSnapshotId, toSnapshotId); return new IncrementalDataTableScan(tableOps(), table(), schema(), - context().fromSnapshotId(fromSnapshotId).toSnapshotId(toSnapshotId)); + context().fromSnapshotIdExclusive(fromSnapshotId).toSnapshotId(toSnapshotId)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index f3fde3260894..cde786b1587a 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -169,10 +169,10 @@ Long fromSnapshotId() { return fromSnapshotId; } - TableScanContext fromSnapshotId(long id) { + TableScanContext fromSnapshotIdExclusive(long id) { return new TableScanContext(snapshotId, rowFilter, ignoreResiduals, caseSensitive, colStats, projectedSchema, selectedColumns, options, id, toSnapshotId, - planExecutor, fromSnapshotInclusive); + planExecutor, false); } TableScanContext fromSnapshotIdInclusive(long id) { diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 92afc009a840..dd733ddd4ced 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -132,7 +132,7 @@ public static Snapshot oldestAncestor(Table table) { * @param lookup lookup function from snapshot ID to snapshot * @return null if there is no current snapshot in the table, else the oldest Snapshot. */ - public static Snapshot oldestAncestor(long snapshotId, Function lookup) { + public static Snapshot oldestAncestorOf(long snapshotId, Function lookup) { Snapshot lastSnapshot = null; for (Snapshot snapshot : ancestorsOf(snapshotId, lookup)) { lastSnapshot = snapshot; diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java index d171848211bb..32c68812b2ad 100644 --- a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java +++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java @@ -109,6 +109,7 @@ public IncrementalScanEvent event() { Assert.assertEquals(table.schema(), listener1.event().projection()); Assert.assertEquals(Expressions.alwaysTrue(), listener1.event().filter()); Assert.assertEquals("test", listener1.event().tableName()); + Assert.assertEquals(false, listener1.event().isFromSnapshotInclusive()); } @Test