Skip to content

Commit

Permalink
Spark: Fix usage of staging location when optimizing metadata (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 3, 2023
1 parent a445925 commit 2c89010
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
*
* <p>By default, this action rewrites all manifests for the current partition spec and writes the
* result to the metadata folder. The behavior can be modified by passing a custom predicate to
* {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is
* a way to configure a custom location for new manifests via {@link #stagingLocation}.
* {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is
* a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}.
* The provided staging location will be ignored if snapshot ID inheritance is enabled. In such
* cases, the manifests are always written to the metadata folder and committed without staging.
*/
public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {
Expand All @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
private final Table table;
private final int formatVersion;
private final long targetManifestSizeBytes;
private final boolean shouldStageManifests;

private PartitionSpec spec = null;
private Predicate<ManifestFile> predicate = manifest -> true;
private String stagingLocation = null;
private String outputLocation = null;

RewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
Expand All @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);

// default the staging location to the metadata location
// default the output location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
this.stagingLocation = metadataFilePath.getParent().toString();
this.outputLocation = metadataFilePath.getParent().toString();

// use the current table format version for new manifests
this.formatVersion = ops.current().formatVersion();

boolean snapshotIdInheritanceEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled;
}

@Override
Expand All @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicat

@Override
public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
this.stagingLocation = newStagingLocation;
if (shouldStageManifests) {
this.outputLocation = newStagingLocation;
} else {
LOG.warn("Ignoring provided staging location as new manifests will be committed directly");
}
return this;
}

@Override
public RewriteManifests.Result execute() {
String desc =
String.format(
"Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
String desc = String.format("Rewriting manifests in %s", table.name());
JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
return withJobGroupInfo(info, this::doExecute);
}
Expand Down Expand Up @@ -236,7 +248,7 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
toManifests(
tableBroadcast,
maxNumManifestEntries,
stagingLocation,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
Expand Down Expand Up @@ -267,7 +279,7 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
toManifests(
tableBroadcast,
maxNumManifestEntries,
stagingLocation,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
Expand Down Expand Up @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) {
private void replaceManifests(
Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
try {
boolean snapshotIdInheritanceEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
deletedManifests.forEach(rewriteManifests::deleteManifest);
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
if (shouldStageManifests) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -92,13 +93,15 @@ public static Object[] parameters() {
private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private final int formatVersion;
private final boolean shouldStageManifests;
private String tableLocation = null;

public TestRewriteManifestsAction(
String snapshotIdInheritanceEnabled, String useCaching, int formatVersion) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
this.formatVersion = formatVersion;
this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false");
}

@Before
Expand Down Expand Up @@ -166,6 +169,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests());

table.refresh();

Expand Down Expand Up @@ -312,6 +316,7 @@ public void testRewriteSmallManifestsPartitionedTable() {
"Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 2 manifests", 2, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests());

table.refresh();

Expand Down Expand Up @@ -376,12 +381,14 @@ public void testRewriteImportedManifests() throws IOException {

SparkActions actions = SparkActions.get();

String rewriteStagingLocation = temp.newFolder().toString();

RewriteManifests.Result result =
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.stagingLocation(rewriteStagingLocation)
.execute();

Assert.assertEquals(
Expand All @@ -390,6 +397,7 @@ public void testRewriteImportedManifests() throws IOException {
result.rewrittenManifests());
Assert.assertEquals(
"Action should add 1 manifest", 1, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);

} finally {
spark.sql("DROP TABLE parquet_table");
Expand Down Expand Up @@ -428,18 +436,21 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {

SparkActions actions = SparkActions.get();

String stagingLocation = temp.newFolder().toString();

RewriteManifests.Result result =
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.stagingLocation(stagingLocation)
.execute();

Assert.assertEquals(
"Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 2 manifests", 2, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests(), stagingLocation);

table.refresh();

Expand Down Expand Up @@ -481,6 +492,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {

SparkActions actions = SparkActions.get();

String stagingLocation = temp.newFolder().toString();

// rewrite only the first manifest
RewriteManifests.Result result =
actions
Expand All @@ -489,14 +502,15 @@ public void testRewriteManifestsWithPredicate() throws IOException {
manifest ->
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.stagingLocation(stagingLocation)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests(), stagingLocation);

table.refresh();

Expand Down Expand Up @@ -560,6 +574,7 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() {
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));
assertManifestsLocation(result.addedManifests());

table.refresh();

Expand Down Expand Up @@ -616,4 +631,16 @@ private long computeManifestEntrySizeBytes(List<ManifestFile> manifests) {

return totalSize / numEntries;
}

private void assertManifestsLocation(Iterable<ManifestFile> manifests) {
assertManifestsLocation(manifests, null);
}

private void assertManifestsLocation(Iterable<ManifestFile> manifests, String stagingLocation) {
if (shouldStageManifests && stagingLocation != null) {
assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation));
} else {
assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
*
* <p>By default, this action rewrites all manifests for the current partition spec and writes the
* result to the metadata folder. The behavior can be modified by passing a custom predicate to
* {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is
* a way to configure a custom location for new manifests via {@link #stagingLocation}.
* {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is
* a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}.
* The provided staging location will be ignored if snapshot ID inheritance is enabled. In such
* cases, the manifests are always written to the metadata folder and committed without staging.
*/
public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {
Expand All @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
private final Table table;
private final int formatVersion;
private final long targetManifestSizeBytes;
private final boolean shouldStageManifests;

private PartitionSpec spec = null;
private Predicate<ManifestFile> predicate = manifest -> true;
private String stagingLocation = null;
private String outputLocation = null;

RewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
Expand All @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);

// default the staging location to the metadata location
// default the output location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
this.stagingLocation = metadataFilePath.getParent().toString();
this.outputLocation = metadataFilePath.getParent().toString();

// use the current table format version for new manifests
this.formatVersion = ops.current().formatVersion();

boolean snapshotIdInheritanceEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled;
}

@Override
Expand All @@ -133,15 +143,17 @@ public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicat

@Override
public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
this.stagingLocation = newStagingLocation;
if (shouldStageManifests) {
this.outputLocation = newStagingLocation;
} else {
LOG.warn("Ignoring provided staging location as new manifests will be committed directly");
}
return this;
}

@Override
public RewriteManifests.Result execute() {
String desc =
String.format(
"Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
String desc = String.format("Rewriting manifests in %s", table.name());
JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
return withJobGroupInfo(info, this::doExecute);
}
Expand Down Expand Up @@ -236,7 +248,7 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
toManifests(
tableBroadcast,
maxNumManifestEntries,
stagingLocation,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
Expand Down Expand Up @@ -267,7 +279,7 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
toManifests(
tableBroadcast,
maxNumManifestEntries,
stagingLocation,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
Expand Down Expand Up @@ -320,18 +332,12 @@ private boolean hasFileCounts(ManifestFile manifest) {
private void replaceManifests(
Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
try {
boolean snapshotIdInheritanceEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
deletedManifests.forEach(rewriteManifests::deleteManifest);
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
if (shouldStageManifests) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Loading

0 comments on commit 2c89010

Please sign in to comment.