Skip to content

Commit

Permalink
Spark 3.1: Preserve file seq numbers while rewriting manifests (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 15, 2022
1 parent 0fdead7 commit 1d10c53
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,16 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
loadMetadataTable(table, ENTRIES)
.filter("status < 2") // select only live entries
.selectExpr(
"input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file");
"input_file_name() as manifest",
"snapshot_id",
"sequence_number",
"file_sequence_number",
"data_file");

Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
return manifestEntryDF
.join(manifestDF, joinCond, "left_semi")
.select("snapshot_id", "sequence_number", "data_file");
.select("snapshot_id", "sequence_number", "file_sequence_number", "data_file");
}

private List<ManifestFile> writeManifestsForUnpartitionedTable(
Expand Down Expand Up @@ -355,8 +359,9 @@ private static ManifestFile writeManifest(
Row row = rows.get(index);
long snapshotId = row.getLong(0);
long sequenceNumber = row.getLong(1);
Row file = row.getStruct(2);
writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
Row file = row.getStruct(3);
writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
}
} finally {
writer.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;

public class ValidationHelpers {

private ValidationHelpers() {}

public static List<Long> dataSeqs(Long... seqs) {
return Arrays.asList(seqs);
}

public static List<Long> fileSeqs(Long... seqs) {
return Arrays.asList(seqs);
}

public static List<Long> snapshotIds(Long... ids) {
return Arrays.asList(ids);
}

public static List<String> files(ContentFile<?>... files) {
return Arrays.stream(files).map(file -> file.path().toString()).collect(Collectors.toList());
}

public static void validateDataManifest(
Table table,
ManifestFile manifest,
List<Long> dataSeqs,
List<Long> fileSeqs,
List<Long> snapshotIds,
List<String> files) {

List<Long> actualDataSeqs = Lists.newArrayList();
List<Long> actualFileSeqs = Lists.newArrayList();
List<Long> actualSnapshotIds = Lists.newArrayList();
List<String> actualFiles = Lists.newArrayList();

for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, table.io()).entries()) {
actualDataSeqs.add(entry.dataSequenceNumber());
actualFileSeqs.add(entry.fileSequenceNumber());
actualSnapshotIds.add(entry.snapshotId());
actualFiles.add(entry.file().path().toString());
}

assertSameElements("data seqs", actualDataSeqs, dataSeqs);
assertSameElements("file seqs", actualFileSeqs, fileSeqs);
assertSameElements("snapshot IDs", actualSnapshotIds, snapshotIds);
assertSameElements("files", actualFiles, files);
}

private static <T> void assertSameElements(String context, List<T> actual, List<T> expected) {
String errorMessage = String.format("%s must match", context);
Assertions.assertThat(actual).as(errorMessage).hasSameElementsAs(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.iceberg.spark.actions;

import static org.apache.iceberg.ValidationHelpers.dataSeqs;
import static org.apache.iceberg.ValidationHelpers.fileSeqs;
import static org.apache.iceberg.ValidationHelpers.files;
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand All @@ -29,6 +34,7 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -38,6 +44,7 @@
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -471,6 +478,67 @@ public void testRewriteManifestsWithPredicate() throws IOException {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);

List<ThreeColumnRecord> records1 = Lists.newArrayList(new ThreeColumnRecord(1, null, "AAAA"));
writeRecords(records1);

table.refresh();

Snapshot snapshot1 = table.currentSnapshot();
DataFile file1 = Iterables.getOnlyElement(snapshot1.addedDataFiles(table.io()));

List<ThreeColumnRecord> records2 = Lists.newArrayList(new ThreeColumnRecord(2, "CCCC", "CCCC"));
writeRecords(records2);

table.refresh();

Snapshot snapshot2 = table.currentSnapshot();
DataFile file2 = Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io()));

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();
RewriteManifests.Result result = actions.rewriteManifests(table).execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));

table.refresh();

List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());

ManifestFile newManifest = Iterables.getOnlyElement(newManifests);
Assert.assertEquals(2, (long) newManifest.existingFilesCount());
Assert.assertFalse(newManifest.hasAddedFiles());
Assert.assertFalse(newManifest.hasDeletedFiles());

validateDataManifest(
table,
newManifest,
dataSeqs(1L, 2L),
fileSeqs(1L, 2L),
snapshotIds(snapshot1.snapshotId(), snapshot2.snapshotId()),
files(file1, file2));

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1", "c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();

Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);
Expand Down

0 comments on commit 1d10c53

Please sign in to comment.