Skip to content

Commit

Permalink
Parquet: Support reading 2-level lists in imported data files (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
SinghAsDev authored Feb 1, 2022
1 parent bb036ce commit 6644393
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -34,7 +40,13 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestGenericData extends DataTest {
@Override
Expand Down Expand Up @@ -78,4 +90,52 @@ protected void writeAndValidate(Schema schema) throws IOException {
}
}
}

@Test
public void testTwoLevelList() throws IOException {
Schema schema = new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())),
optional(2, "topbytes", Types.BinaryType.get())
);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<org.apache.avro.generic.GenericRecord>
writer = AvroParquetWriter.<org.apache.avro.generic.GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte);
expectedByteList.add(expectedBinary);
recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();

// test reuseContainers
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(schema)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
Assert.assertTrue("Should have at least one row", it.hasNext());
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), expectedBinary);
Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), expectedBinary);
Assert.assertFalse("Should not have more than one row", it.hasNext());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
Expand Down Expand Up @@ -143,13 +144,12 @@ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
return null;
}

GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
Expand All @@ -35,11 +43,60 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestFlinkParquetReader extends DataTest {
private static final int NUM_RECORDS = 100;

@Test
public void testTwoLevelList() throws IOException {
Schema schema = new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())),
optional(2, "topbytes", Types.BinaryType.get())
);
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte);
expectedByteList.add(expectedBinary);
recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();

try (CloseableIterable<RowData> reader = Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();
Assert.assertTrue("Should have at least one row", rows.hasNext());
RowData rowData = rows.next();
Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
Assert.assertFalse("Should not have more than one row", rows.hasNext());
}
}

private void writeAndValidate(Iterable<Record> iterable, Schema schema) throws IOException {
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,12 @@ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
return null;
}

GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ public Type list(GroupType list, Type elementType) {
Preconditions.checkArgument(elementType != null,
"List type must have element field");

Type listElement = ParquetSchemaUtil.determineListElementType(list);
MappedField field = nameMapping.find(currentPath());
Type listType = Types.buildGroup(list.getRepetition())
.as(LogicalTypeAnnotation.listType())
.repeatedGroup().addFields(elementType).named(list.getFieldName(0))
.named(list.getName());

Types.GroupBuilder<GroupType> listBuilder = Types.buildGroup(list.getRepetition())
.as(LogicalTypeAnnotation.listType());
if (listElement.isRepetition(Type.Repetition.REPEATED)) {
listBuilder.addFields(elementType);
} else {
listBuilder.repeatedGroup().addFields(elementType).named(list.getFieldName(0));
}
Type listType = listBuilder.named(list.getName());

return field == null ? listType : listType.withId(field.id());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ public Type struct(GroupType struct, List<Type> fieldTypes) {

@Override
public Type list(GroupType array, Type elementType) {
GroupType repeated = array.getType(0).asGroupType();
org.apache.parquet.schema.Type element = repeated.getType(0);

Preconditions.checkArgument(
!element.isRepetition(Repetition.REPEATED),
"Elements cannot have repetition REPEATED: %s", element);
org.apache.parquet.schema.Type element = ParquetSchemaUtil.determineListElementType(array);

Integer elementFieldId = getId(element);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
Type elementType = ParquetSchemaUtil.determineListElementType(array);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ListReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,58 @@ public Boolean primitive(PrimitiveType primitive) {
}
}

public static Type determineListElementType(GroupType array) {
Type repeated = array.getFields().get(0);
boolean isOldListElementType = isOldListElementType(array);

return isOldListElementType ? repeated : repeated.asGroupType().getType(0);
}

// Parquet LIST backwards-compatibility rules.
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
static boolean isOldListElementType(GroupType list) {
Type repeatedType = list.getFields().get(0);
String parentName = list.getName();

return
// For legacy 2-level list types with primitive element type, e.g.:
//
// // ARRAY<INT> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
//
repeatedType.isPrimitive() ||
// For legacy 2-level list types whose element type is a group type with 2 or more fields,
// e.g.:
//
// // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// required int32 num;
// };
// }
//
repeatedType.asGroupType().getFieldCount() > 1 ||
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
// };
// }
repeatedType.getName().equals("array") ||
// For Parquet data generated by parquet-thrift, e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
// };
// }
//
repeatedType.getName().equals(parentName + "_tuple");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,41 @@ private static <T> T visitList(GroupType list, ParquetTypeVisitor<T> visitor) {
Preconditions.checkArgument(list.getFieldCount() == 1,
"Invalid list: does not contain single repeated field: %s", list);

GroupType repeatedElement = list.getFields().get(0).asGroupType();
Type repeatedElement = list.getFields().get(0);
Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
"Invalid list: inner group is not repeated");
Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
"Invalid list: repeated group is not a single field: %s", list);

visitor.beforeRepeatedElement(repeatedElement);
try {
T elementResult = null;
if (repeatedElement.getFieldCount() > 0) {
Type elementField = repeatedElement.getType(0);
visitor.beforeElementField(elementField);
try {
elementResult = visit(elementField, visitor);
} finally {
visitor.afterElementField(elementField);
}
}
Type listElement = ParquetSchemaUtil.determineListElementType(list);
if (listElement.isRepetition(Type.Repetition.REPEATED)) {
T elementResult = visitListElement(listElement, visitor);
return visitor.list(list, elementResult);
} else {
return visitThreeLevelList(list, repeatedElement, listElement, visitor);
}
}

private static <T> T visitThreeLevelList(
GroupType list, Type repeated, Type listElement, ParquetTypeVisitor<T> visitor) {
visitor.beforeRepeatedElement(repeated);
try {
T elementResult = visitListElement(listElement, visitor);
return visitor.list(list, elementResult);
} finally {
visitor.afterRepeatedElement(repeated);
}
}

private static <T> T visitListElement(Type listElement, ParquetTypeVisitor<T> visitor) {
T elementResult = null;

visitor.beforeElementField(listElement);
try {
elementResult = visit(listElement, visitor);
} finally {
visitor.afterRepeatedElement(repeatedElement);
visitor.afterElementField(listElement);
}

return elementResult;
}

private static <T> T visitMap(GroupType map, ParquetTypeVisitor<T> visitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,19 @@ public Type struct(GroupType struct, List<Type> fields) {

@Override
public Type list(GroupType list, Type element) {
GroupType repeated = list.getType(0).asGroupType();
Type originalElement = repeated.getType(0);
Type repeated = list.getType(0);
Type originalElement = ParquetSchemaUtil.determineListElementType(list);
Integer elementId = getId(originalElement);

if (elementId != null && selectedIds.contains(elementId)) {
return list;
} else if (element != null) {
if (!Objects.equal(element, originalElement)) {
return list.withNewFields(repeated.withNewFields(element));
if (originalElement.isRepetition(Type.Repetition.REPEATED)) {
return list.withNewFields(element);
} else {
return list.withNewFields(repeated.asGroupType().withNewFields(element));
}
}
return list;
}
Expand Down
Loading

0 comments on commit 6644393

Please sign in to comment.