Skip to content

Commit

Permalink
Parquet: Fix null values when selecting nested field partition column (
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored Nov 18, 2022
1 parent 36387fc commit 25335a0
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public ParquetValueReader<RowData> struct(
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
Expand All @@ -101,6 +102,9 @@ public ParquetValueReader<RowData> struct(
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}
}
Expand All @@ -110,11 +114,16 @@ public ParquetValueReader<RowData> struct(
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assume;
import org.junit.Test;

/** Test {@link FlinkInputFormat}. */
Expand Down Expand Up @@ -135,6 +140,52 @@ public void testBasicProjection() throws IOException {
TestHelpers.assertRows(result, expected);
}

@Test
public void testReadPartitionColumn() throws Exception {
Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);

Schema nestedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(
2,
"struct",
Types.StructType.of(
Types.NestedField.optional(3, "innerId", Types.LongType.get()),
Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
PartitionSpec spec =
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();

Table table =
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
for (Record record : records) {
org.apache.iceberg.TestHelpers.Row partition =
org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
appender.appendToTable(partition, Collections.singletonList(record));
}

TableSchema projectedSchema =
TableSchema.builder()
.field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING())))
.build();
List<Row> result =
runFormat(
FlinkSource.forRowData()
.tableLoader(tableLoader())
.project(projectedSchema)
.buildFormat());

List<Row> expected = Lists.newArrayList();
for (Record record : records) {
Row nested = Row.of(((Record) record.get(1)).get(1));
expected.add(Row.of(nested));
}

TestHelpers.assertRows(result, expected);
}

private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
return TestHelpers.readRows(inputFormat, rowType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public ParquetValueReader<?> struct(
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
ParquetValueReader<?> fieldReader = fieldReaders.get(i);
Expand All @@ -138,6 +139,9 @@ public ParquetValueReader<?> struct(
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}

Expand All @@ -146,11 +150,16 @@ public ParquetValueReader<?> struct(
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public static <C> ParquetValueReader<C> constant(C value) {
return new ConstantReader<>(value);
}

public static <C> ParquetValueReader<C> constant(C value, int definitionLevel) {
return new ConstantReader<>(value, definitionLevel);
}

public static ParquetValueReader<Long> position() {
return new PositionReader();
}
Expand Down Expand Up @@ -113,9 +117,46 @@ public void setPageSource(PageReadStore pageStore, long rowPosition) {}

static class ConstantReader<C> implements ParquetValueReader<C> {
private final C constantValue;
private final TripleIterator<?> column;
private final List<TripleIterator<?>> children;

ConstantReader(C constantValue) {
this.constantValue = constantValue;
this.column = NullReader.NULL_COLUMN;
this.children = NullReader.COLUMNS;
}

ConstantReader(C constantValue, int definitionLevel) {
this.constantValue = constantValue;
this.column =
new TripleIterator<Object>() {
@Override
public int currentDefinitionLevel() {
return definitionLevel;
}

@Override
public int currentRepetitionLevel() {
return 0;
}

@Override
public <N> N nextNull() {
return null;
}

@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
};

this.children = ImmutableList.of(column);
}

@Override
Expand All @@ -125,12 +166,12 @@ public C read(C reuse) {

@Override
public TripleIterator<?> column() {
return NullReader.NULL_COLUMN;
return column;
}

@Override
public List<TripleIterator<?>> columns() {
return NullReader.COLUMNS;
return children;
}

@Override
Expand Down
11 changes: 10 additions & 1 deletion pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,34 @@ public ParquetValueReader<?> struct(
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (partitionValues.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}

List<Types.NestedField> expectedFields =
expected != null ? expected.fields() : ImmutableList.of();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (partitionValues.containsKey(id)) {
// the value may be null so containsKey is used to check for a partition value
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(partitionValues.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public ParquetValueReader<?> struct(
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
Expand All @@ -148,6 +149,9 @@ public ParquetValueReader<?> struct(
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}

Expand All @@ -156,11 +160,16 @@ public ParquetValueReader<?> struct(
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;

public class ComplexRecord {
private long id;
private NestedRecord struct;

public ComplexRecord() {}

public ComplexRecord(long id, NestedRecord struct) {
this.id = id;
this.struct = struct;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public NestedRecord getStruct() {
return struct;
}

public void setStruct(NestedRecord struct) {
this.struct = struct;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

ComplexRecord record = (ComplexRecord) o;
return id == record.id && Objects.equal(struct, record.struct);
}

@Override
public int hashCode() {
return Objects.hashCode(id, struct);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("id", id).add("struct", struct).toString();
}
}
Loading

0 comments on commit 25335a0

Please sign in to comment.