Skip to content

Commit

Permalink
Add abstract BaseParquetReaders for Iceberg generics and Flink (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
openinx authored Jul 2, 2020
1 parent 053bea6 commit 41cc133
Show file tree
Hide file tree
Showing 7 changed files with 726 additions and 683 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,388 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.data.parquet;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public abstract class BaseParquetReaders<T> {
protected BaseParquetReaders() {
}

protected ParquetValueReader<T> createReader(Schema expectedSchema,
MessageType fileSchema) {
return createReader(expectedSchema, fileSchema, ImmutableMap.of());
}

@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createReader(Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<T>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader<T>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, idToConstant));
}
}

protected abstract ParquetValueReader<T> createStructReader(List<Type> types,
List<ParquetValueReader<?>> fieldReaders,
Types.StructType structType);

private class FallbackReadBuilder extends ReadBuilder {
private FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
super(type, idToConstant);
}

@Override
public ParquetValueReader<?> message(Types.StructType expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
// the top level matches by ID, but the remaining IDs are missing
return super.struct(expected, message, fieldReaders);
}

@Override
public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
// the expected struct is ignored because nested fields are never found when the
List<ParquetValueReader<?>> newFields = Lists.newArrayListWithExpectedSize(
fieldReaders.size());
List<Type> types = Lists.newArrayListWithExpectedSize(fieldReaders.size());
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1;
newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
types.add(fieldType);
}

return createStructReader(types, newFields, expected);
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;

private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
this.idToConstant = idToConstant;
}

@Override
public ParquetValueReader<?> message(Types.StructType expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
}

@Override
public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
}

List<Types.NestedField> expectedFields = expected != null ?
expected.fields() : ImmutableList.of();
List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
}
}

return createStructReader(types, reorderedFields, expected);
}

@Override
public ParquetValueReader<?> list(Types.ListType expectedList, GroupType array,
ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type elementType = repeated.getType(0);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;

return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
}

@Override
public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
ParquetValueReader<?> keyReader,
ParquetValueReader<?> valueReader) {
GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();

int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;

Type keyType = repeatedKeyValue.getType(0);
int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
Type valueType = repeatedKeyValue.getType(1);
int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;

return new ParquetValueReaders.MapReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(keyType, keyD, keyReader),
ParquetValueReaders.option(valueType, valueD, valueReader));
}

@Override
public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
return new ParquetValueReaders.StringReader(desc);
case INT_8:
case INT_16:
case INT_32:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
}
case INT_64:
return new ParquetValueReaders.UnboxedReader<>(desc);
case DATE:
return new DateReader(desc);
case TIMESTAMP_MICROS:
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
if (tsMicrosType.shouldAdjustToUTC()) {
return new TimestamptzReader(desc);
} else {
return new TimestampReader(desc);
}
case TIMESTAMP_MILLIS:
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
if (tsMillisType.shouldAdjustToUTC()) {
return new TimestamptzMillisReader(desc);
} else {
return new TimestampMillisReader(desc);
}
case TIME_MICROS:
return new TimeReader(desc);
case TIME_MILLIS:
return new TimeMillisReader(desc);
case DECIMAL:
DecimalMetadata decimal = primitive.getDecimalMetadata();
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new ParquetValueReaders.BinaryAsDecimalReader(desc, decimal.getScale());
case INT64:
return new ParquetValueReaders.LongAsDecimalReader(desc, decimal.getScale());
case INT32:
return new ParquetValueReaders.IntegerAsDecimalReader(desc, decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
case BSON:
return new ParquetValueReaders.BytesReader(desc);
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
}
}

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
case BINARY:
return new ParquetValueReaders.BytesReader(desc);
case INT32:
if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
}
case FLOAT:
if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) {
return new ParquetValueReaders.FloatAsDoubleReader(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
}
case BOOLEAN:
case INT64:
case DOUBLE:
return new ParquetValueReaders.UnboxedReader<>(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
}

MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading

0 comments on commit 41cc133

Please sign in to comment.