Skip to content

Commit

Permalink
Core: Don't persist useless file and position bounds for deletes (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Aug 22, 2023
1 parent 87d2a92 commit 74a7d95
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 1 deletion.
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetricsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,37 @@ public class MetricsUtil {

private MetricsUtil() {}

/**
* Copies a metrics object without lower and upper bounds for given fields.
*
* @param excludedFieldIds field IDs for which the lower and upper bounds must be dropped
* @return a new metrics object without lower and upper bounds for given fields
*/
public static Metrics copyWithoutFieldBounds(Metrics metrics, Set<Integer> excludedFieldIds) {
return new Metrics(
metrics.recordCount(),
metrics.columnSizes(),
metrics.valueCounts(),
metrics.nullValueCounts(),
metrics.nanValueCounts(),
copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds),
copyWithoutKeys(metrics.upperBounds(), excludedFieldIds));
}

private static <K, V> Map<K, V> copyWithoutKeys(Map<K, V> map, Set<K> keys) {
if (map == null) {
return null;
}

Map<K, V> filteredMap = Maps.newHashMap(map);

for (K key : keys) {
filteredMap.remove(key);
}

return filteredMap.isEmpty() ? null : filteredMap;
}

/**
* Construct mapping relationship between column id to NaN value counts from input metrics and
* metrics config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
*/
package org.apache.iceberg.deletes;

import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.CharSequenceSet;

/**
Expand All @@ -40,6 +47,9 @@
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
private static final Set<Integer> SINGLE_REFERENCED_FILE_BOUNDS_ONLY =
ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());

private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
Expand Down Expand Up @@ -89,7 +99,7 @@ public void close() throws IOException {
.withEncryptionKeyMetadata(keyMetadata)
.withSplitOffsets(appender.splitOffsets())
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withMetrics(metrics())
.build();
}
}
Expand All @@ -107,4 +117,13 @@ public DeleteFile toDeleteFile() {
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
}

private Metrics metrics() {
Metrics metrics = appender.metrics();
if (referencedDataFiles.size() > 1) {
return MetricsUtil.copyWithoutFieldBounds(metrics, SINGLE_REFERENCED_FILE_BOUNDS_ONLY);
} else {
return metrics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,17 @@ public void testPositionDeleteWriter() throws IOException {
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();

if (fileFormat == FileFormat.AVRO) {
Assert.assertNull(deleteFile.lowerBounds());
Assert.assertNull(deleteFile.upperBounds());
} else {
Assert.assertEquals(1, referencedDataFiles.size());
Assert.assertEquals(2, deleteFile.lowerBounds().size());
Assert.assertTrue(deleteFile.lowerBounds().containsKey(DELETE_FILE_PATH.fieldId()));
Assert.assertEquals(2, deleteFile.upperBounds().size());
Assert.assertTrue(deleteFile.upperBounds().containsKey(DELETE_FILE_PATH.fieldId()));
}

// verify the written delete file
GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
List<Record> expectedDeletes =
Expand Down Expand Up @@ -302,6 +313,53 @@ public void testPositionDeleteWriterWithRow() throws IOException {
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}

@Test
public void testPositionDeleteWriterMultipleDataFiles() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// write two data files
DataFile dataFile1 = writeData(writerFactory, dataRows, table.spec(), partition);
DataFile dataFile2 = writeData(writerFactory, dataRows, table.spec(), partition);

// write a position delete file referencing both
List<PositionDelete<T>> deletes =
ImmutableList.of(
positionDelete(dataFile1.path(), 0L, null),
positionDelete(dataFile1.path(), 2L, null),
positionDelete(dataFile2.path(), 4L, null));
Pair<DeleteFile, CharSequenceSet> result =
writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();

// verify the written delete file has NO lower and upper bounds
Assert.assertEquals(2, referencedDataFiles.size());
Assert.assertNull(deleteFile.lowerBounds());
Assert.assertNull(deleteFile.upperBounds());

// commit the data and delete files
table
.newRowDelta()
.addRows(dataFile1)
.addRows(dataFile2)
.addDeletes(deleteFile)
.validateDataFilesExist(referencedDataFiles)
.validateDeletedFiles()
.commit();

// verify the delete file is applied correctly
List<T> expectedRows =
ImmutableList.of(
toRow(2, "aaa"),
toRow(4, "aaa"),
toRow(5, "aaa"),
toRow(1, "aaa"),
toRow(2, "aaa"),
toRow(3, "aaa"),
toRow(4, "aaa"));
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}

private DataFile writeData(
FileWriterFactory<T> writerFactory, List<T> rows, PartitionSpec spec, StructLike partitionKey)
throws IOException {
Expand Down
38 changes: 38 additions & 0 deletions data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,44 @@ public void testPositionDeleteMetrics() throws IOException {
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}

@Test
public void testPositionDeleteMetricsCoveringMultipleDataFiles() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table);
EncryptedOutputFile outputFile = fileFactory.newOutputFile();
PositionDeleteWriter<T> deleteWriter =
writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);

try {
PositionDelete<T> positionDelete = PositionDelete.create();

positionDelete.set("File A", 1, toRow(3, "3", true, 3L));
deleteWriter.write(positionDelete);

positionDelete.set("File B", 1, toRow(3, "3", true, 3L));
deleteWriter.write(positionDelete);

} finally {
deleteWriter.close();
}

DeleteFile deleteFile = deleteWriter.toDeleteFile();

// should have NO bounds for path and position as the file covers multiple data paths
Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
Assert.assertEquals(2, lowerBounds.size());
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));

Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
Assert.assertEquals(2, upperBounds.size());
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}

@Test
public void testMaxColumns() throws IOException {
File tableDir = temp.newFolder();
Expand Down

0 comments on commit 74a7d95

Please sign in to comment.