Skip to content

Commit

Permalink
Core: Add EqualityDeleteRowReader (apache#2320)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjunjiedada authored Mar 13, 2021
1 parent 28621f9 commit 713136d
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 8 deletions.
46 changes: 40 additions & 6 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -48,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.Preconditions;
Expand Down Expand Up @@ -110,17 +112,17 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
private List<Predicate<T>> applyEqDeletes() {
List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
if (eqDeletes.isEmpty()) {
return records;
return isInDeleteSets;
}

Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile delete : eqDeletes) {
filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
}

CloseableIterable<T> filteredRecords = records;
for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
Set<Integer> ids = entry.getKey();
Iterable<DeleteFile> deletes = entry.getValue();
Expand All @@ -137,11 +139,43 @@ private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
deleteSchema.asStruct());

filteredRecords = Deletes.filter(filteredRecords,
record -> projectRow.wrap(asStructLike(record)), deleteSet);
Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
}

return filteredRecords;
return isInDeleteSets;
}

public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
// Predicate to test whether a row has been deleted by equality deletions.
Predicate<T> deletedRows = applyEqDeletes().stream()
.reduce(Predicate::or)
.orElse(t -> false);

Filter<T> deletedRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return deletedRows.test(item);
}
};
return deletedRowsFilter.filter(records);
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
Predicate<T> remainingRows = applyEqDeletes().stream()
.map(Predicate::negate)
.reduce(Predicate::and)
.orElse(t -> true);

Filter<T> remainingRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return remainingRows.test(item);
}
};

return remainingRowsFilter.filter(records);
}

private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,13 @@ private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
.forEach(set::add);
return set;
}

protected StructLikeSet rowSetWitIds(int... idsToRetain) {
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRetain));
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
records.stream()
.filter(row -> deletedIds.contains(row.getField("id")))
.forEach(set::add);
return set;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;

public class EqualityDeleteRowReader extends RowDataReader {
private final Schema expectedSchema;

public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive);
this.expectedSchema = expectedSchema;
}

@Override
CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches = new SparkDeleteFilter(task, tableSchema(), expectedSchema);

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
DataFile file = task.file();

// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ CloseableIterator<InternalRow> open(FileScanTask task) {
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}

private CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
protected Schema tableSchema() {
return tableSchema;
}

protected CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
Expand Down Expand Up @@ -215,7 +219,7 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema
JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
}

private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -41,11 +43,14 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand All @@ -56,6 +61,7 @@
import org.junit.Test;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

public abstract class TestSparkReaderDeletes extends DeleteReadTests {

Expand Down Expand Up @@ -161,4 +167,55 @@ public void testEqualityDeleteWithFilter() throws IOException {

Assert.assertEquals("Table should contain no rows", 0, actual.size());
}

@Test
public void testReadEqualityDeleteRows() throws IOException {
Schema deleteSchema1 = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteSchema1);
List<Record> dataDeletes = Lists.newArrayList(
dataDelete.copy("data", "a"), // id = 29
dataDelete.copy("data", "d") // id = 89
);

Schema deleteSchema2 = table.schema().select("id");
Record idDelete = GenericRecord.create(deleteSchema2);
List<Record> idDeletes = Lists.newArrayList(
idDelete.copy("id", 121), // id = 121
idDelete.copy("id", 122) // id = 122
);

DeleteFile eqDelete1 = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteSchema1);

DeleteFile eqDelete2 = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), idDeletes, deleteSchema2);

table.newRowDelta()
.addDeletes(eqDelete1)
.addDeletes(eqDelete2)
.commit();

StructLikeSet expectedRowSet = rowSetWitIds(29, 89, 121, 122);

Types.StructType type = table.schema().asStruct();
StructLikeSet actualRowSet = StructLikeSet.create(type);

CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
table.newScan().planFiles(),
TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
TableProperties.SPLIT_LOOKBACK_DEFAULT,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);

for (CombinedScanTask task : tasks) {
try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(),
table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) {
while (reader.next()) {
actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));
}
}
}

Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size());
Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet);
}
}

0 comments on commit 713136d

Please sign in to comment.