Skip to content

Commit

Permalink
Core: Metadata table queries fail if a partition column was reused in…
Browse files Browse the repository at this point in the history
… V2 (apache#4662)
  • Loading branch information
szlta authored Jun 22, 2022
1 parent 08c8764 commit a5efb53
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 18 deletions.
32 changes: 30 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,35 @@ private int assignFieldId() {
return lastAssignedPartitionId;
}

/**
* In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field
* ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField.
* @param sourceTransform pair of source ID and transform for this PartitionField addition
* @param name target partition field name, if specified
* @return the recycled or newly created partition field
*/
private PartitionField recycleOrCreatePartitionField(Pair<Integer, Transform<?, ?>> sourceTransform, String name) {
if (formatVersion == 2 && base != null) {
int sourceId = sourceTransform.first();
Transform<?, ?> transform = sourceTransform.second();

Set<PartitionField> allHistoricalFields = Sets.newHashSet();
for (PartitionSpec partitionSpec : base.specs()) {
allHistoricalFields.addAll(partitionSpec.fields());
}

for (PartitionField field : allHistoricalFields) {
if (field.sourceId() == sourceId && field.transform().equals(transform)) {
// if target name is specified then consider it too, otherwise not
if (name == null || field.name().equals(name)) {
return field;
}
}
}
}
return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
}

@Override
public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
Expand Down Expand Up @@ -157,8 +186,7 @@ public BaseUpdatePartitionSpec addField(String name, Term term) {
Preconditions.checkArgument(added == null,
"Cannot add duplicate partition field %s=%s, already added: %s", name, term, added);

PartitionField newField = new PartitionField(
sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name);
if (newField.name() == null) {
String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/iceberg/ScanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -133,4 +135,63 @@ public void testTableScanWithPlanExecutor() {
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}

@Test
public void testReAddingPartitionField() throws Exception {
Assume.assumeTrue(formatVersion == 2);
Schema schema = new Schema(
required(1, "a", Types.IntegerType.get()),
required(2, "b", Types.StringType.get()),
required(3, "data", Types.IntegerType.get())
);
PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build();
File dir = temp.newFolder();
dir.delete();
this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion);
table.newFastAppend().appendFile(DataFiles.builder(initialSpec)
.withPath("/path/to/data/a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("a=1")
.withRecordCount(1)
.build()).commit();

table.updateSpec().addField("b").removeField("a").commit();
table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1")
.withRecordCount(1)
.build()).commit();

table.updateSpec().addField("a").commit();
table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/ab.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1/a=1")
.withRecordCount(1)
.build()).commit();

table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/a2b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1/a=2")
.withRecordCount(1)
.build()).commit();

TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1"));
try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
for (CombinedScanTask combinedScanTask : tasks) {
Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size());
}
}

TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2));
try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
for (CombinedScanTask combinedScanTask : tasks) {
Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,11 @@ public void testFilesTableScanWithDroppedPartition() throws IOException {
table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
table.refresh();

table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
// Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per
// https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed
// data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require
// data_bucket not to be recycled.
table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit();
table.refresh();

table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,6 @@ public void testPartitionsTableRenameFields() throws ParseException {

@Test
public void testPartitionsTableSwitchFields() throws Exception {
// Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
// In V1, dropped partition fields show separately when field is re-added
// In V2, re-added field currently conflicts with its deleted form
Assume.assumeTrue(formatVersion == 1);

sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -531,17 +526,34 @@ public void testPartitionsTableSwitchFields() throws Exception {

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName);

assertPartitions(
ImmutableList.of(
row(null, "c1", null),
row(null, "c1", "d1"),
row(null, "c2", null),
row(null, "c2", "d2"),
row("d1", "c1", null),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS);
if (formatVersion == 1) {
assertPartitions(
ImmutableList.of(
row(null, "c1", null),
row(null, "c1", "d1"),
row(null, "c2", null),
row(null, "c2", "d2"),
row(null, "c3", "d3"),
row("d1", "c1", null),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS);
} else {
// In V2 re-adding a former partition field that was part of an older spec will not change its name or its
// field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new
// partition field addition will result in a new column in this metadata table)
assertPartitions(
ImmutableList.of(
row(null, "c1"),
row(null, "c2"),
row("d1", "c1"),
row("d2", "c2"),
row("d3", "c3")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);
}
}

@Test
Expand Down

0 comments on commit a5efb53

Please sign in to comment.