Skip to content

Commit

Permalink
Core: Add data_file.spec_id to metadata tables (apache#3015)
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho authored Oct 27, 2021
1 parent 556937a commit a3eadf6
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 48 deletions.
4 changes: 3 additions & 1 deletion api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()),
"Equality comparison field IDs");
Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 141
// NEXT ID TO ASSIGN: 142

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
return StructType.of(
CONTENT,
FILE_PATH,
FILE_FORMAT,
SPEC_ID,
required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
RECORD_COUNT,
FILE_SIZE,
Expand Down
58 changes: 32 additions & 26 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,45 +235,48 @@ public void put(int i, Object value) {
this.format = FileFormat.valueOf(value.toString());
return;
case 3:
this.partitionData = (PartitionData) value;
this.partitionSpecId = (value != null) ? (Integer) value : -1;
return;
case 4:
this.recordCount = (Long) value;
this.partitionData = (PartitionData) value;
return;
case 5:
this.fileSizeInBytes = (Long) value;
this.recordCount = (Long) value;
return;
case 6:
this.columnSizes = (Map<Integer, Long>) value;
this.fileSizeInBytes = (Long) value;
return;
case 7:
this.valueCounts = (Map<Integer, Long>) value;
this.columnSizes = (Map<Integer, Long>) value;
return;
case 8:
this.nullValueCounts = (Map<Integer, Long>) value;
this.valueCounts = (Map<Integer, Long>) value;
return;
case 9:
this.nanValueCounts = (Map<Integer, Long>) value;
this.nullValueCounts = (Map<Integer, Long>) value;
return;
case 10:
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.nanValueCounts = (Map<Integer, Long>) value;
return;
case 11:
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 12:
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 13:
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
return;
case 14:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
return;
case 15:
this.sortOrderId = (Integer) value;
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 16:
this.sortOrderId = (Integer) value;
return;
case 17:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -301,32 +304,34 @@ public Object get(int i) {
case 2:
return format != null ? format.toString() : null;
case 3:
return partitionData;
return partitionSpecId;
case 4:
return recordCount;
return partitionData;
case 5:
return fileSizeInBytes;
return recordCount;
case 6:
return columnSizes;
return fileSizeInBytes;
case 7:
return valueCounts;
return columnSizes;
case 8:
return nullValueCounts;
return valueCounts;
case 9:
return nanValueCounts;
return nullValueCounts;
case 10:
return lowerBounds;
return nanValueCounts;
case 11:
return upperBounds;
return lowerBounds;
case 12:
return keyMetadata();
return upperBounds;
case 13:
return splitOffsets();
return keyMetadata();
case 14:
return equalityFieldIds();
return splitOffsets();
case 15:
return sortOrderId;
return equalityFieldIds();
case 16:
return sortOrderId;
case 17:
return pos;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -442,6 +447,7 @@ public String toString() {
.add("content", content.toString().toLowerCase(Locale.ROOT))
.add("file_path", filePath)
.add("file_format", format)
.add("spec_id", specId())
.add("partition", partitionData)
.add("record_count", recordCount)
.add("file_size_in_bytes", fileSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testAllEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -549,7 +549,7 @@ public void testFilesUnpartitionedTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -646,7 +646,7 @@ public void testAllDataFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -1411,4 +1411,9 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException {

Assert.assertEquals("Rows must match", records, actualRecords);
}

private void asMetadataRecord(GenericData.Record file) {
file.put(0, FileContent.DATA.id());
file.put(3, 0); // specId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Set<String> extractFilePathsMatchingConditionOnPartition(List<Row> files
// idx 1: file_path, idx 3: partition
return files.stream()
.filter(r -> {
Row partition = r.getStruct(3);
Row partition = r.getStruct(4);
return condition.test(partition);
}).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1)))
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void testEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -298,7 +299,7 @@ public void testAllEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -373,7 +374,7 @@ public void testFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -429,7 +430,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -536,7 +537,7 @@ public void testFilesUnpartitionedTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -633,7 +634,7 @@ public void testAllDataFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -1181,4 +1182,44 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException {

Assert.assertEquals("Rows must match", records, actualRecords);
}

@Test
public void testFilesTablePartitionId() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
int spec0 = table.spec().specId();

Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

df1.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(loadLocation(tableIdentifier));

// change partition spec
table.refresh();
table.updateSpec().removeField("id").commit();
int spec1 = table.spec().specId();

// add a second file
df2.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(loadLocation(tableIdentifier));

List<Integer> actual = spark.read()
.format("iceberg")
.load(loadLocation(tableIdentifier, "files"))
.sort(DataFile.SPEC_ID.name())
.collectAsList()
.stream().map(r -> (Integer) r.getAs(DataFile.SPEC_ID.name())).collect(Collectors.toList());

Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual);
}

private void asMetadataRecord(GenericData.Record file) {
file.put(0, FileContent.DATA.id());
file.put(3, 0); // specId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Set<String> extractFilePathsMatchingConditionOnPartition(List<Row> files
// idx 1: file_path, idx 3: partition
return files.stream()
.filter(r -> {
Row partition = r.getStruct(3);
Row partition = r.getStruct(4);
return condition.test(partition);
}).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1)))
.collect(Collectors.toSet());
Expand Down
Loading

0 comments on commit a3eadf6

Please sign in to comment.