diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 854232a62d5b..dd287ea193cb 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ * *

By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate newPredicat @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ private List writeManifestsForUnpartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ private List writeManifestsForPartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) { private void replaceManifests( Iterable deletedManifests, Iterable addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index eadcfc426166..a5dd0054da25 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public static Object[] parameters() { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public TestRewriteManifestsAction( this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public void testRewriteSmallManifestsPartitionedTable() { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public void testRewriteImportedManifests() throws IOException { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public void testRewriteImportedManifests() throws IOException { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ private long computeManifestEntrySizeBytes(List manifests) { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2fce..bc2ef2306790 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ * *

By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate newPredicat @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ private List writeManifestsForUnpartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ private List writeManifestsForPartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) { private void replaceManifests( Iterable deletedManifests, Iterable addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index eadcfc426166..a5dd0054da25 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public static Object[] parameters() { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public TestRewriteManifestsAction( this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public void testRewriteSmallManifestsPartitionedTable() { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public void testRewriteImportedManifests() throws IOException { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public void testRewriteImportedManifests() throws IOException { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ private long computeManifestEntrySizeBytes(List manifests) { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2fce..bc2ef2306790 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ * *

By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate newPredicat @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ private List writeManifestsForUnpartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ private List writeManifestsForPartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) { private void replaceManifests( Iterable deletedManifests, Iterable addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 3522decec096..4ce5ba4e9d6a 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public static Object[] parameters() { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public TestRewriteManifestsAction( this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public void testRewriteSmallManifestsPartitionedTable() { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public void testRewriteImportedManifests() throws IOException { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public void testRewriteImportedManifests() throws IOException { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ private long computeManifestEntrySizeBytes(List manifests) { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2fce..bc2ef2306790 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ * *

By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate newPredicat @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ private List writeManifestsForUnpartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ private List writeManifestsForPartitionedTable( toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) { private void replaceManifests( Iterable deletedManifests, Iterable addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 3522decec096..4ce5ba4e9d6a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public static Object[] parameters() { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public TestRewriteManifestsAction( this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public void testRewriteSmallManifestsPartitionedTable() { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public void testRewriteImportedManifests() throws IOException { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public void testRewriteImportedManifests() throws IOException { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ private long computeManifestEntrySizeBytes(List manifests) { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } }