Skip to content

Commit

Permalink
Expire snapshots action without cache (apache#1344)
Browse files Browse the repository at this point in the history
Instead of using a cache to preserve the state from before the
expireSnapshots command, we preserve the table metadata via a
StaticTable reference. This reference doesn't change when the
Snapshosts are expired and allows us to look up all the files
referenced by the prior version of the table without holding
everything in memory.
  • Loading branch information
RussellSpitzer authored Aug 20, 2020
1 parent c75f004 commit d46c7d6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 45 deletions.
32 changes: 23 additions & 9 deletions spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand All @@ -36,7 +38,10 @@ abstract class BaseAction<R> implements Action<R> {
protected abstract Table table();

protected String metadataTableName(MetadataTableType type) {
String tableName = table().toString();
return metadataTableName(table().toString(), type);
}

protected String metadataTableName(String tableName, MetadataTableType type) {
if (tableName.contains("/")) {
return tableName + "#" + type;
} else if (tableName.startsWith("hadoop.")) {
Expand All @@ -56,9 +61,9 @@ protected String metadataTableName(MetadataTableType type) {
* @param table the table
* @return the paths of the Manifest Lists
*/
protected List<String> getManifestListPaths(Table table) {
private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
List<String> manifestLists = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
if (manifestListLocation != null) {
manifestLists.add(manifestListLocation);
Expand All @@ -73,7 +78,7 @@ protected List<String> getManifestListPaths(Table table) {
* @param ops TableOperations for the table we will be getting paths from
* @return a list of paths to metadata files
*/
protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
private List<String> getOtherMetadataFilePaths(TableOperations ops) {
List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));

Expand All @@ -86,27 +91,36 @@ protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
}

protected Dataset<Row> buildValidDataFileDF(SparkSession spark) {
String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES);
return buildValidDataFileDF(spark, table().toString());
}

protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
}

protected Dataset<Row> buildManifestFileDF(SparkSession spark) {
String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS);
protected Dataset<Row> buildManifestFileDF(SparkSession spark, String tableName) {
String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS);
return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path");
}

protected Dataset<Row> buildManifestListDF(SparkSession spark, Table table) {
List<String> manifestLists = getManifestListPaths(table);
List<String> manifestLists = getManifestListPaths(table.snapshots());
return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildManifestListDF(SparkSession spark, String metadataFileLocation) {
StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, table().io());
return buildManifestListDF(spark, new BaseTable(ops, table().toString()));
}

protected Dataset<Row> buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) {
List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops);
return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) {
Dataset<Row> manifestDF = buildManifestFileDF(spark);
Dataset<Row> manifestDF = buildManifestFileDF(spark, table.toString());
Dataset<Row> manifestListDF = buildManifestListDF(spark, table);
Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -147,49 +148,40 @@ public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {

@Override
public ExpireSnapshotsActionResult execute() {
Dataset<Row> originalFiles = null;
try {
// Metadata before Expiration
originalFiles = buildValidFileDF().persist();
// Action to trigger persist
originalFiles.count();

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF();
Dataset<Row> filesToDelete = originalFiles.except(validFiles);

return deleteFiles(filesToDelete.toLocalIterator());
} finally {
if (originalFiles != null) {
originalFiles.unpersist();
}
// Metadata before Expiration
Dataset<Row> originalFiles = buildValidFileDF(ops.current());

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
Dataset<Row> filesToDelete = originalFiles.except(validFiles);

return deleteFiles(filesToDelete.toLocalIterator());
}

private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF() {
return appendTypeString(buildValidDataFileDF(spark), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(spark), MANIFEST))
.union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST));
private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
return appendTypeString(buildValidDataFileDF(spark, metadata.metadataFileLocation()), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(spark, metadata.metadataFileLocation()), MANIFEST))
.union(appendTypeString(buildManifestListDF(spark, metadata.metadataFileLocation()), MANIFEST_LIST));
}

/**
Expand Down

0 comments on commit d46c7d6

Please sign in to comment.