Skip to content

Commit

Permalink
Spark: Optimize snapshot expiry (apache#3457)
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho authored Oct 14, 2022
1 parent 120db78 commit 312592d
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 33 deletions.
9 changes: 5 additions & 4 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
* <p>This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
private static final int REF_SNAPSHOT_ID = 18;
public static final Types.NestedField REF_SNAPSHOT_ID =
Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());

private static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Expand All @@ -74,8 +76,7 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
Types.NestedField.required(
REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()));
REF_SNAPSHOT_ID);

AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
Expand Down Expand Up @@ -424,7 +425,7 @@ private <T> Boolean compareSnapshotRef(
}

private <T> boolean isSnapshotRef(BoundReference<T> ref) {
return ref.fieldId() == REF_SNAPSHOT_ID;
return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,10 +101,25 @@ private static TableMetadata findFirstExistentPreviousMetadata(
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
* @return the location of manifest Lists
* @return the location of manifest lists
*/
public static List<String> manifestListLocations(Table table) {
return manifestListLocations(table, null);
}

/**
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
* @param snapshotIds ids of snapshots for which manifest lists will be returned
* @return the location of manifest lists
*/
public static List<String> manifestListLocations(Table table, Set<Long> snapshotIds) {
Iterable<Snapshot> snapshots = table.snapshots();
if (snapshotIds != null) {
snapshots = Iterables.filter(snapshots, s -> snapshotIds.contains(s.snapshotId()));
}

List<String> manifestListLocations = Lists.newArrayList();
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -137,14 +140,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}

// builds a DF of delete and data file path and type by reading all manifests
protected Dataset<FileInfo> contentFileDS(Table table) {
return contentFileDS(table, null);
}

protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

Dataset<ManifestFileBean> allManifests =
loadMetadataTable(table, ALL_MANIFESTS)
Dataset<ManifestFileBean> manifestBeanDS =
manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
Expand All @@ -155,17 +161,35 @@ protected Dataset<FileInfo> contentFileDS(Table table) {
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);

return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}

protected Dataset<FileInfo> manifestDS(Table table) {
return loadMetadataTable(table, ALL_MANIFESTS)
return manifestDS(table, null);
}

protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}

private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
if (snapshotIds != null) {
Column filterCond = col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
return manifestDF.filter(filterCond);
} else {
return manifestDF;
}
}

protected Dataset<FileInfo> manifestListDS(Table table) {
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table);
return manifestListDS(table, null);
}

protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> snapshotIds) {
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -165,7 +167,7 @@ public Dataset<Row> expire() {
public Dataset<FileInfo> expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
TableMetadata originalMetadata = ops.current();

// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
Expand All @@ -184,11 +186,16 @@ public Dataset<FileInfo> expireFiles() {

expireSnapshots.cleanExpiredFiles(false).commit();

// fetch metadata after expiration
Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);

// fetch files referenced by expired snapshots
Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);

// determine expired files
this.expiredFileDS = originalFileDS.except(validFileDS);
this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}

return expiredFileDS;
Expand Down Expand Up @@ -236,11 +243,25 @@ private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}

private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
private Dataset<FileInfo> fileDS(TableMetadata metadata) {
return fileDS(metadata, null);
}

private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
return contentFileDS(staticTable)
.union(manifestDS(staticTable))
.union(manifestListDS(staticTable));
return contentFileDS(staticTable, snapshotIds)
.union(manifestDS(staticTable, snapshotIds))
.union(manifestListDS(staticTable, snapshotIds));
}

private Set<Long> findExpiredSnapshotIds(
TableMetadata originalMetadata, TableMetadata updatedMetadata) {
Set<Long> retainedSnapshots =
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
return originalMetadata.snapshots().stream()
.map(Snapshot::snapshotId)
.filter(id -> !retainedSnapshots.contains(id))
.collect(Collectors.toSet());
}

private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
Expand All @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -1250,4 +1252,104 @@ public void testExpireAfterExecute() {
List<Row> untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
}

@Test
public void testExpireFileDeletionMostExpired() {
testExpireFilesAreDeleted(5, 2);
}

@Test
public void testExpireFileDeletionMostRetained() {
testExpireFilesAreDeleted(2, 5);
}

public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) {
// Add data files to be expired
Set<String> dataFiles = Sets.newHashSet();
for (int i = 0; i < dataFilesExpired; i++) {
DataFile df =
DataFiles.builder(SPEC)
.withPath(String.format("/path/to/data-expired-%d.parquet", i))
.withFileSizeInBytes(10)
.withPartitionPath("c1=1")
.withRecordCount(1)
.build();
dataFiles.add(df.path().toString());
table.newFastAppend().appendFile(df).commit();
}

// Delete them all, these will be deleted on expire snapshot
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
// Clears "DELETED" manifests
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();

Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);

// Add data files to be retained, which are not deleted.
for (int i = 0; i < dataFilesRetained; i++) {
DataFile df =
DataFiles.builder(SPEC)
.withPath(String.format("/path/to/data-retained-%d.parquet", i))
.withFileSizeInBytes(10)
.withPartitionPath("c1=1")
.withRecordCount(1)
.build();
table.newFastAppend().appendFile(df).commit();
}

long end = rightAfterSnapshot();

Set<String> expectedDeletes = Sets.newHashSet();
expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
// all snapshot manifest lists except current will be deleted
expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
expectedDeletes.addAll(
manifestsBefore); // new manifests are reachable from current snapshot and not deleted
expectedDeletes.addAll(
dataFiles); // new data files are reachable from current snapshot and not deleted

Set<String> deletedFiles = Sets.newHashSet();
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(end)
.deleteWith(deletedFiles::add)
.execute();

Assert.assertEquals(
"All reachable files before expiration should be deleted", expectedDeletes, deletedFiles);
}

@Test
public void testExpireSomeCheckFilesDeleted() {

table.newAppend().appendFile(FILE_A).commit();

table.newAppend().appendFile(FILE_B).commit();

table.newAppend().appendFile(FILE_C).commit();

table.newDelete().deleteFile(FILE_A).commit();

long after = rightAfterSnapshot();
waitUntilAfter(after);

table.newAppend().appendFile(FILE_D).commit();

table.newDelete().deleteFile(FILE_B).commit();

Set<String> deletedFiles = Sets.newHashSet();
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(after)
.deleteWith(deletedFiles::add)
.execute();

// C, D should be retained (live)
// B should be retained (previous snapshot points to it)
// A should be deleted
Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
Expand Down Expand Up @@ -808,4 +810,11 @@ public static Set<DeleteFile> deleteFiles(Table table) {

return deleteFiles;
}

public static Set<String> reachableManifestPaths(Table table) {
return StreamSupport.stream(table.snapshots().spliterator(), false)
.flatMap(s -> s.allManifests(table.io()).stream())
.map(ManifestFile::path)
.collect(Collectors.toSet());
}
}
Loading

0 comments on commit 312592d

Please sign in to comment.