diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index 570fb5955986..d82e3afc4652 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -20,8 +20,10 @@ package org.apache.iceberg.actions; import java.util.List; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -36,7 +38,10 @@ abstract class BaseAction implements Action { protected abstract Table table(); protected String metadataTableName(MetadataTableType type) { - String tableName = table().toString(); + return metadataTableName(table().toString(), type); + } + + protected String metadataTableName(String tableName, MetadataTableType type) { if (tableName.contains("/")) { return tableName + "#" + type; } else if (tableName.startsWith("hadoop.")) { @@ -56,9 +61,9 @@ protected String metadataTableName(MetadataTableType type) { * @param table the table * @return the paths of the Manifest Lists */ - protected List getManifestListPaths(Table table) { + private List getManifestListPaths(Iterable snapshots) { List manifestLists = Lists.newArrayList(); - for (Snapshot snapshot : table.snapshots()) { + for (Snapshot snapshot : snapshots) { String manifestListLocation = snapshot.manifestListLocation(); if (manifestListLocation != null) { manifestLists.add(manifestListLocation); @@ -73,7 +78,7 @@ protected List getManifestListPaths(Table table) { * @param ops TableOperations for the table we will be getting paths from * @return a list of paths to metadata files */ - protected List getOtherMetadataFilePaths(TableOperations ops) { + private List getOtherMetadataFilePaths(TableOperations ops) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); @@ -86,27 +91,36 @@ protected List getOtherMetadataFilePaths(TableOperations ops) { } protected Dataset buildValidDataFileDF(SparkSession spark) { - String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); + return buildValidDataFileDF(spark, table().toString()); + } + + protected Dataset buildValidDataFileDF(SparkSession spark, String tableName) { + String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES); return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path"); } - protected Dataset buildManifestFileDF(SparkSession spark) { - String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); + protected Dataset buildManifestFileDF(SparkSession spark, String tableName) { + String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS); return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path"); } protected Dataset buildManifestListDF(SparkSession spark, Table table) { - List manifestLists = getManifestListPaths(table); + List manifestLists = getManifestListPaths(table.snapshots()); return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path"); } + protected Dataset buildManifestListDF(SparkSession spark, String metadataFileLocation) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, table().io()); + return buildManifestListDF(spark, new BaseTable(ops, table().toString())); + } + protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { List otherMetadataFiles = getOtherMetadataFilePaths(ops); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } protected Dataset buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) { - Dataset manifestDF = buildManifestFileDF(spark); + Dataset manifestDF = buildManifestFileDF(spark, table.toString()); Dataset manifestListDF = buildManifestListDF(spark, table); Dataset otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops); diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index 79fc00ad2486..dc36e4e7cd5a 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -27,6 +27,7 @@ import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -147,49 +148,40 @@ public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { @Override public ExpireSnapshotsActionResult execute() { - Dataset originalFiles = null; - try { - // Metadata before Expiration - originalFiles = buildValidFileDF().persist(); - // Action to trigger persist - originalFiles.count(); - - // Perform Expiration - ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); - for (final Long id : expireSnapshotIdValues) { - expireSnaps = expireSnaps.expireSnapshotId(id); - } - - if (expireOlderThanValue != null) { - expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); - } - - if (retainLastValue != null) { - expireSnaps = expireSnaps.retainLast(retainLastValue); - } - - expireSnaps.commit(); - - // Metadata after Expiration - Dataset validFiles = buildValidFileDF(); - Dataset filesToDelete = originalFiles.except(validFiles); - - return deleteFiles(filesToDelete.toLocalIterator()); - } finally { - if (originalFiles != null) { - originalFiles.unpersist(); - } + // Metadata before Expiration + Dataset originalFiles = buildValidFileDF(ops.current()); + + // Perform Expiration + ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); + for (final Long id : expireSnapshotIdValues) { + expireSnaps = expireSnaps.expireSnapshotId(id); + } + + if (expireOlderThanValue != null) { + expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); + } + + if (retainLastValue != null) { + expireSnaps = expireSnaps.retainLast(retainLastValue); } + + expireSnaps.commit(); + + // Metadata after Expiration + Dataset validFiles = buildValidFileDF(ops.refresh()); + Dataset filesToDelete = originalFiles.except(validFiles); + + return deleteFiles(filesToDelete.toLocalIterator()); } private Dataset appendTypeString(Dataset ds, String type) { return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); } - private Dataset buildValidFileDF() { - return appendTypeString(buildValidDataFileDF(spark), DATA_FILE) - .union(appendTypeString(buildManifestFileDF(spark), MANIFEST)) - .union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST)); + private Dataset buildValidFileDF(TableMetadata metadata) { + return appendTypeString(buildValidDataFileDF(spark, metadata.metadataFileLocation()), DATA_FILE) + .union(appendTypeString(buildManifestFileDF(spark, metadata.metadataFileLocation()), MANIFEST)) + .union(appendTypeString(buildManifestListDF(spark, metadata.metadataFileLocation()), MANIFEST_LIST)); } /**