From 41cc1334bfe50c7dccb58baeb664b236732f0e2b Mon Sep 17 00:00:00 2001 From: openinx Date: Fri, 3 Jul 2020 03:42:27 +0800 Subject: [PATCH] Add abstract BaseParquetReaders for Iceberg generics and Flink (#1162) --- .../data/parquet/BaseParquetReaders.java | 388 ++++++++++++++++++ .../data/parquet/BaseParquetWriter.java | 292 +++++++++++++ .../data/parquet/GenericParquetReaders.java | 377 +---------------- .../data/parquet/GenericParquetWriter.java | 264 +----------- .../flink/data/FlinkParquetReaders.java | 58 +-- .../flink/data/FlinkParquetWriters.java | 26 +- .../data/TestFlinkParquetReaderWriter.java | 4 +- 7 files changed, 726 insertions(+), 683 deletions(-) create mode 100644 data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java create mode 100644 data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java new file mode 100644 index 000000000000..36d014438ab7 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.parquet; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public abstract class BaseParquetReaders { + protected BaseParquetReaders() { + } + + protected ParquetValueReader createReader(Schema expectedSchema, + MessageType fileSchema) { + return createReader(expectedSchema, fileSchema, ImmutableMap.of()); + } + + @SuppressWarnings("unchecked") + protected ParquetValueReader createReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { + if (ParquetSchemaUtil.hasIds(fileSchema)) { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReadBuilder(fileSchema, idToConstant)); + } else { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new FallbackReadBuilder(fileSchema, idToConstant)); + } + } + + protected abstract ParquetValueReader createStructReader(List types, + List> fieldReaders, + Types.StructType structType); + + private class FallbackReadBuilder extends ReadBuilder { + private FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + public ParquetValueReader message(Types.StructType expected, MessageType message, + List> fieldReaders) { + // the top level matches by ID, but the remaining IDs are missing + return super.struct(expected, message, fieldReaders); + } + + @Override + public ParquetValueReader struct(Types.StructType expected, GroupType struct, + List> fieldReaders) { + // the expected struct is ignored because nested fields are never found when the + List> newFields = Lists.newArrayListWithExpectedSize( + fieldReaders.size()); + List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); + List fields = struct.getFields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i); + int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; + newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); + types.add(fieldType); + } + + return createStructReader(types, newFields, expected); + } + } + + private class ReadBuilder extends TypeWithSchemaVisitor> { + private final MessageType type; + private final Map idToConstant; + + private ReadBuilder(MessageType type, Map idToConstant) { + this.type = type; + this.idToConstant = idToConstant; + } + + @Override + public ParquetValueReader message(Types.StructType expected, MessageType message, + List> fieldReaders) { + return struct(expected, message.asGroupType(), fieldReaders); + } + + @Override + public ParquetValueReader struct(Types.StructType expected, GroupType struct, + List> fieldReaders) { + // match the expected struct's order + Map> readersById = Maps.newHashMap(); + Map typesById = Maps.newHashMap(); + List fields = struct.getFields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i); + int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; + int id = fieldType.getId().intValue(); + readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); + typesById.put(id, fieldType); + } + + List expectedFields = expected != null ? + expected.fields() : ImmutableList.of(); + List> reorderedFields = Lists.newArrayListWithExpectedSize( + expectedFields.size()); + List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + for (Types.NestedField field : expectedFields) { + int id = field.fieldId(); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); + types.add(null); + } else { + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } + } + } + + return createStructReader(types, reorderedFields, expected); + } + + @Override + public ParquetValueReader list(Types.ListType expectedList, GroupType array, + ParquetValueReader elementReader) { + GroupType repeated = array.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; + int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; + + Type elementType = repeated.getType(0); + int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + + return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR, + ParquetValueReaders.option(elementType, elementD, elementReader)); + } + + @Override + public ParquetValueReader map(Types.MapType expectedMap, GroupType map, + ParquetValueReader keyReader, + ParquetValueReader valueReader) { + GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; + int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; + + Type keyType = repeatedKeyValue.getType(0); + int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1; + Type valueType = repeatedKeyValue.getType(1); + int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1; + + return new ParquetValueReaders.MapReader<>(repeatedD, repeatedR, + ParquetValueReaders.option(keyType, keyD, keyReader), + ParquetValueReaders.option(valueType, valueD, valueReader)); + } + + @Override + public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + + if (primitive.getOriginalType() != null) { + switch (primitive.getOriginalType()) { + case ENUM: + case JSON: + case UTF8: + return new ParquetValueReaders.StringReader(desc); + case INT_8: + case INT_16: + case INT_32: + if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { + return new ParquetValueReaders.IntAsLongReader(desc); + } else { + return new ParquetValueReaders.UnboxedReader<>(desc); + } + case INT_64: + return new ParquetValueReaders.UnboxedReader<>(desc); + case DATE: + return new DateReader(desc); + case TIMESTAMP_MICROS: + Types.TimestampType tsMicrosType = (Types.TimestampType) expected; + if (tsMicrosType.shouldAdjustToUTC()) { + return new TimestamptzReader(desc); + } else { + return new TimestampReader(desc); + } + case TIMESTAMP_MILLIS: + Types.TimestampType tsMillisType = (Types.TimestampType) expected; + if (tsMillisType.shouldAdjustToUTC()) { + return new TimestamptzMillisReader(desc); + } else { + return new TimestampMillisReader(desc); + } + case TIME_MICROS: + return new TimeReader(desc); + case TIME_MILLIS: + return new TimeMillisReader(desc); + case DECIMAL: + DecimalMetadata decimal = primitive.getDecimalMetadata(); + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return new ParquetValueReaders.BinaryAsDecimalReader(desc, decimal.getScale()); + case INT64: + return new ParquetValueReaders.LongAsDecimalReader(desc, decimal.getScale()); + case INT32: + return new ParquetValueReaders.IntegerAsDecimalReader(desc, decimal.getScale()); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + case BSON: + return new ParquetValueReaders.BytesReader(desc); + default: + throw new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getOriginalType()); + } + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return new FixedReader(desc); + case BINARY: + return new ParquetValueReaders.BytesReader(desc); + case INT32: + if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { + return new ParquetValueReaders.IntAsLongReader(desc); + } else { + return new ParquetValueReaders.UnboxedReader<>(desc); + } + case FLOAT: + if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) { + return new ParquetValueReaders.FloatAsDoubleReader(desc); + } else { + return new ParquetValueReaders.UnboxedReader<>(desc); + } + case BOOLEAN: + case INT64: + case DOUBLE: + return new ParquetValueReaders.UnboxedReader<>(desc); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + + MessageType type() { + return type; + } + } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private static class DateReader extends ParquetValueReaders.PrimitiveReader { + private DateReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDate read(LocalDate reuse) { + return EPOCH_DAY.plusDays(column.nextInteger()); + } + } + + private static class TimestampReader extends ParquetValueReaders.PrimitiveReader { + private TimestampReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); + } + } + + private static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader { + private TimestampMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); + } + } + + private static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader { + private TimestamptzReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); + } + } + + private static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader { + private TimestamptzMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); + } + } + + private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { + private TimeMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); + } + } + + private static class TimeReader extends ParquetValueReaders.PrimitiveReader { + private TimeReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); + } + } + + private static class FixedReader extends ParquetValueReaders.PrimitiveReader { + private FixedReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public byte[] read(byte[] reuse) { + if (reuse != null) { + column.nextBinary().toByteBuffer().duplicate().get(reuse); + return reuse; + } else { + return column.nextBinary().getBytes(); + } + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java new file mode 100644 index 000000000000..282a92d22f3a --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.parquet; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.parquet.ParquetTypeVisitor; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public abstract class BaseParquetWriter { + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + } + + protected abstract ParquetValueWriters.StructWriter createStructWriter(List> writers); + + private class WriteBuilder extends ParquetTypeVisitor> { + private final MessageType type; + + private WriteBuilder(MessageType type) { + this.type = type; + } + + @Override + public ParquetValueWriter message(MessageType message, List> fieldWriters) { + return struct(message.asGroupType(), fieldWriters); + } + + @Override + public ParquetValueWriter struct(GroupType struct, + List> fieldWriters) { + List fields = struct.getFields(); + List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = struct.getType(i); + int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())); + writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); + } + + return createStructWriter(writers); + } + + @Override + public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + GroupType repeated = array.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + org.apache.parquet.schema.Type elementType = repeated.getType(0); + int elementD = type.getMaxDefinitionLevel(path(elementType.getName())); + + return ParquetValueWriters.collections(repeatedD, repeatedR, + ParquetValueWriters.option(elementType, elementD, elementWriter)); + } + + @Override + public ParquetValueWriter map(GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { + GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + org.apache.parquet.schema.Type keyType = repeatedKeyValue.getType(0); + int keyD = type.getMaxDefinitionLevel(path(keyType.getName())); + org.apache.parquet.schema.Type valueType = repeatedKeyValue.getType(1); + int valueD = type.getMaxDefinitionLevel(path(valueType.getName())); + + return ParquetValueWriters.maps(repeatedD, repeatedR, + ParquetValueWriters.option(keyType, keyD, keyWriter), + ParquetValueWriters.option(valueType, valueD, valueWriter)); + } + + @Override + public ParquetValueWriter primitive(PrimitiveType primitive) { + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); + if (logicalType != null) { + Optional> writer = + logicalType.accept(new LogicalTypeWriterVisitor(desc)); + if (writer.isPresent()) { + return writer.get(); + } + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return new FixedWriter(desc); + case BINARY: + return ParquetValueWriters.byteBuffers(desc); + case BOOLEAN: + return ParquetValueWriters.booleans(desc); + case INT32: + return ParquetValueWriters.ints(desc); + case INT64: + return ParquetValueWriters.longs(desc); + case FLOAT: + return ParquetValueWriters.floats(desc); + case DOUBLE: + return ParquetValueWriters.doubles(desc); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + } + + private static class LogicalTypeWriterVisitor implements + LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of(ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of(ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of(ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(new DateWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + return Optional.of(new TimeWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS is supported", timestampType.getUnit()); + if (timestampType.isAdjustedToUTC()) { + return Optional.of(new TimestamptzWriter(desc)); + } else { + return Optional.of(new TimestampWriter(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private static class DateWriter extends ParquetValueWriters.PrimitiveWriter { + private DateWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDate value) { + column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); + } + } + + private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { + private TimeWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalTime value) { + column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); + } + } + + private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { + private TimestampWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDateTime value) { + column.writeLong(repetitionLevel, + ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); + } + } + + private static class TimestamptzWriter extends ParquetValueWriters.PrimitiveWriter { + private TimestamptzWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, OffsetDateTime value) { + column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); + } + } + + private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { + private FixedWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, byte[] value) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 3e4e7c779597..e8a914a73563 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -19,391 +19,40 @@ package org.apache.iceberg.data.parquet; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; -import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.BytesReader; -import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader; -import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.ListReader; -import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.MapReader; -import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader; -import org.apache.iceberg.parquet.ParquetValueReaders.StringReader; import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; -import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader; -import org.apache.iceberg.parquet.TypeWithSchemaVisitor; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Type.TypeID; -import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; -import org.apache.iceberg.types.Types.TimestampType; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.DecimalMetadata; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -public class GenericParquetReaders { - protected GenericParquetReaders() { - } - - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { - return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); - } - - @SuppressWarnings("unchecked") - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant) { - if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, idToConstant)); - } else { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, idToConstant)); - } - } - - protected static class FallbackReadBuilder extends ReadBuilder { - protected FallbackReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); - } - - @Override - public ParquetValueReader message(StructType expected, MessageType message, - List> fieldReaders) { - // the top level matches by ID, but the remaining IDs are missing - return super.struct(expected, message, fieldReaders); - } - - @Override - public ParquetValueReader struct(StructType expected, GroupType struct, - List> fieldReaders) { - // the expected struct is ignored because nested fields are never found when the - List> newFields = Lists.newArrayListWithExpectedSize( - fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List fields = struct.getFields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i); - int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; - newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - types.add(fieldType); - } - - return createStructReader(types, newFields, expected); - } - } - - protected static class ReadBuilder extends TypeWithSchemaVisitor> { - private final MessageType type; - private final Map idToConstant; - - protected ReadBuilder(MessageType type, Map idToConstant) { - this.type = type; - this.idToConstant = idToConstant; - } - - @Override - public ParquetValueReader message(StructType expected, MessageType message, - List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); - } - - protected StructReader createStructReader(List types, - List> readers, - StructType struct) { - return new RecordReader(types, readers, struct); - } - - @Override - public ParquetValueReader struct(StructType expected, GroupType struct, - List> fieldReaders) { - // match the expected struct's order - Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - List fields = struct.getFields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i); - int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; - int id = fieldType.getId().intValue(); - readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); - } - - List expectedFields = expected != null ? - expected.fields() : ImmutableList.of(); - List> reorderedFields = Lists.newArrayListWithExpectedSize( - expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - for (Types.NestedField field : expectedFields) { - int id = field.fieldId(); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); - types.add(null); - } else { - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } - } - } - - return createStructReader(types, reorderedFields, expected); - } - - @Override - public ParquetValueReader list(Types.ListType expectedList, GroupType array, - ParquetValueReader elementReader) { - GroupType repeated = array.getFields().get(0).asGroupType(); - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - - Type elementType = repeated.getType(0); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; - - return new ListReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); - } - - @Override - public ParquetValueReader map(Types.MapType expectedMap, GroupType map, - ParquetValueReader keyReader, - ParquetValueReader valueReader) { - GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - - Type keyType = repeatedKeyValue.getType(0); - int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1; - Type valueType = repeatedKeyValue.getType(1); - int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1; - - return new MapReader<>(repeatedD, repeatedR, - ParquetValueReaders.option(keyType, keyD, keyReader), - ParquetValueReaders.option(valueType, valueD, valueReader)); - } - - @Override - public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - ColumnDescriptor desc = type.getColumnDescription(currentPath()); - - if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return new StringReader(desc); - case INT_8: - case INT_16: - case INT_32: - if (expected.typeId() == TypeID.LONG) { - return new IntAsLongReader(desc); - } else { - return new UnboxedReader<>(desc); - } - case INT_64: - return new UnboxedReader<>(desc); - case DATE: - return new DateReader(desc); - case TIMESTAMP_MICROS: - TimestampType tsMicrosType = (TimestampType) expected; - if (tsMicrosType.shouldAdjustToUTC()) { - return new TimestamptzReader(desc); - } else { - return new TimestampReader(desc); - } - case TIMESTAMP_MILLIS: - TimestampType tsMillisType = (TimestampType) expected; - if (tsMillisType.shouldAdjustToUTC()) { - return new TimestamptzMillisReader(desc); - } else { - return new TimestampMillisReader(desc); - } - case TIME_MICROS: - return new TimeReader(desc); - case TIME_MILLIS: - return new TimeMillisReader(desc); - case DECIMAL: - DecimalMetadata decimal = primitive.getDecimalMetadata(); - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return new BinaryAsDecimalReader(desc, decimal.getScale()); - case INT64: - return new LongAsDecimalReader(desc, decimal.getScale()); - case INT32: - return new IntegerAsDecimalReader(desc, decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - case BSON: - return new BytesReader(desc); - default: - throw new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getOriginalType()); - } - } - - switch (primitive.getPrimitiveTypeName()) { - case FIXED_LEN_BYTE_ARRAY: - return new FixedReader(desc); - case BINARY: - return new BytesReader(desc); - case INT32: - if (expected != null && expected.typeId() == TypeID.LONG) { - return new IntAsLongReader(desc); - } else { - return new UnboxedReader<>(desc); - } - case FLOAT: - if (expected != null && expected.typeId() == TypeID.DOUBLE) { - return new ParquetValueReaders.FloatAsDoubleReader(desc); - } else { - return new UnboxedReader<>(desc); - } - case BOOLEAN: - case INT64: - case DOUBLE: - return new UnboxedReader<>(desc); - default: - throw new UnsupportedOperationException("Unsupported type: " + primitive); - } - } - - MessageType type() { - return type; - } - } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateReader extends PrimitiveReader { - private DateReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDate read(LocalDate reuse) { - return EPOCH_DAY.plusDays(column.nextInteger()); - } - } - - private static class TimestampReader extends PrimitiveReader { - private TimestampReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestampMillisReader extends PrimitiveReader { - private TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestamptzReader extends PrimitiveReader { - private TimestamptzReader(ColumnDescriptor desc) { - super(desc); - } +public class GenericParquetReaders extends BaseParquetReaders { - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); - } - } + private static final GenericParquetReaders INSTANCE = new GenericParquetReaders(); - private static class TimestamptzMillisReader extends PrimitiveReader { - private TimestamptzMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); - } + private GenericParquetReaders() { } - private static class TimeMillisReader extends PrimitiveReader { - private TimeMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); - } + public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { + return INSTANCE.createReader(expectedSchema, fileSchema); } - private static class TimeReader extends PrimitiveReader { - private TimeReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); - } + public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, + Map idToConstant) { + return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } - private static class FixedReader extends PrimitiveReader { - private FixedReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public byte[] read(byte[] reuse) { - if (reuse != null) { - column.nextBinary().toByteBuffer().duplicate().get(reuse); - return reuse; - } else { - return column.nextBinary().getBytes(); - } - } + @Override + protected ParquetValueReader createStructReader(List types, List> fieldReaders, + StructType structType) { + return new RecordReader(types, fieldReaders, structType); } - static class RecordReader extends StructReader { + private static class RecordReader extends StructReader { private final StructType structType; RecordReader(List types, diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 4cdb2dee1d21..5bb878f536ba 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -19,273 +19,25 @@ package org.apache.iceberg.data.parquet; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.Optional; import org.apache.iceberg.data.Record; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; -import org.apache.iceberg.parquet.ParquetValueWriters; -import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.parquet.Preconditions; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; -public class GenericParquetWriter { - protected GenericParquetWriter() { - } - - @SuppressWarnings("unchecked") - public static ParquetValueWriter buildWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); - } - - protected static class WriteBuilder extends ParquetTypeVisitor> { - private final MessageType type; - - protected WriteBuilder(MessageType type) { - this.type = type; - } - - @Override - public ParquetValueWriter message(MessageType message, - List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); - } - - protected StructWriter createStructWriter(List> writers) { - return new RecordWriter(writers); - } - - @Override - public ParquetValueWriter struct(GroupType struct, - List> fieldWriters) { - List fields = struct.getFields(); - List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = struct.getType(i); - int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())); - writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); - } - - return createStructWriter(writers); - } - - @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { - GroupType repeated = array.getFields().get(0).asGroupType(); - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath); - int repeatedR = type.getMaxRepetitionLevel(repeatedPath); - - org.apache.parquet.schema.Type elementType = repeated.getType(0); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())); - - return ParquetValueWriters.collections(repeatedD, repeatedR, - ParquetValueWriters.option(elementType, elementD, elementWriter)); - } - - @Override - public ParquetValueWriter map(GroupType map, - ParquetValueWriter keyWriter, - ParquetValueWriter valueWriter) { - GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath); - int repeatedR = type.getMaxRepetitionLevel(repeatedPath); - - org.apache.parquet.schema.Type keyType = repeatedKeyValue.getType(0); - int keyD = type.getMaxDefinitionLevel(path(keyType.getName())); - org.apache.parquet.schema.Type valueType = repeatedKeyValue.getType(1); - int valueD = type.getMaxDefinitionLevel(path(valueType.getName())); - - return ParquetValueWriters.maps(repeatedD, repeatedR, - ParquetValueWriters.option(keyType, keyD, keyWriter), - ParquetValueWriters.option(valueType, valueD, valueWriter)); - } - - @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { - ColumnDescriptor desc = type.getColumnDescription(currentPath()); - LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); - if (logicalType != null) { - Optional> writer = logicalType.accept(new LogicalTypeWriterVisitor(desc)); - if (writer.isPresent()) { - return writer.get(); - } - } - - switch (primitive.getPrimitiveTypeName()) { - case FIXED_LEN_BYTE_ARRAY: - return new FixedWriter(desc); - case BINARY: - return ParquetValueWriters.byteBuffers(desc); - case BOOLEAN: - return ParquetValueWriters.booleans(desc); - case INT32: - return ParquetValueWriters.ints(desc); - case INT64: - return ParquetValueWriters.longs(desc); - case FLOAT: - return ParquetValueWriters.floats(desc); - case DOUBLE: - return ParquetValueWriters.doubles(desc); - default: - throw new UnsupportedOperationException("Unsupported type: " + primitive); - } - } - } - - private static class LogicalTypeWriterVisitor implements LogicalTypeAnnotationVisitor> { - private final ColumnDescriptor desc; - - private LogicalTypeWriterVisitor(ColumnDescriptor desc) { - this.desc = desc; - } - - @Override - public Optional> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { - switch (desc.getPrimitiveType().getPrimitiveTypeName()) { - case INT32: - return Optional.of(ParquetValueWriters.decimalAsInteger( - desc, decimalType.getPrecision(), decimalType.getScale())); - case INT64: - return Optional.of(ParquetValueWriters.decimalAsLong( - desc, decimalType.getPrecision(), decimalType.getScale())); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of(ParquetValueWriters.decimalAsFixed( - desc, decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } +public class GenericParquetWriter extends BaseParquetWriter { + private static final GenericParquetWriter INSTANCE = new GenericParquetWriter(); - @Override - public Optional> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { - return Optional.of(new DateWriter(desc)); - } - - @Override - public Optional> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { - return Optional.of(new TimeWriter(desc)); - } - - @Override - public Optional> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { - Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", timestampType.getUnit()); - if (timestampType.isAdjustedToUTC()) { - return Optional.of(new TimestamptzWriter(desc)); - } else { - return Optional.of(new TimestampWriter(desc)); - } - } - - @Override - public Optional> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() < 64, - "Cannot read uint64: not a supported Java type"); - if (intType.getBitWidth() < 64) { - return Optional.of(ParquetValueWriters.ints(desc)); - } else { - return Optional.of(ParquetValueWriters.longs(desc)); - } - } - - @Override - public Optional> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { - return Optional.of(ParquetValueWriters.byteBuffers(desc)); - } + private GenericParquetWriter() { } - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateWriter extends PrimitiveWriter { - private DateWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDate value) { - column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); - } - } - - private static class TimeWriter extends PrimitiveWriter { - private TimeWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalTime value) { - column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); - } - } - - private static class TimestampWriter extends PrimitiveWriter { - private TimestampWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDateTime value) { - column.writeLong(repetitionLevel, - ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); - } - } - - private static class TimestamptzWriter extends PrimitiveWriter { - private TimestamptzWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, OffsetDateTime value) { - column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); - } + public static ParquetValueWriter buildWriter(MessageType type) { + return INSTANCE.createWriter(type); } - private static class FixedWriter extends PrimitiveWriter { - private FixedWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, byte[] value) { - column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); - } + @Override + protected StructWriter createStructWriter(List> writers) { + return new RecordWriter(writers); } private static class RecordWriter extends StructWriter { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index e9d9bc08b5ef..ccea5d6529c5 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -20,66 +20,34 @@ package org.apache.iceberg.flink.data; import java.util.List; -import java.util.Map; import org.apache.flink.types.Row; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.data.parquet.BaseParquetReaders; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.TypeWithSchemaVisitor; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -public class FlinkParquetReaders extends GenericParquetReaders { - private FlinkParquetReaders() { - } - - @SuppressWarnings("unchecked") - public static ParquetValueReader buildRowReader(Schema expectedSchema, - MessageType fileSchema) { - if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, ImmutableMap.of())); - } else { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, ImmutableMap.of())); - } - } - - private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder { +public class FlinkParquetReaders extends BaseParquetReaders { - private FallbackReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); - } + private static final FlinkParquetReaders INSTANCE = new FlinkParquetReaders(); - @Override - protected ParquetValueReaders.StructReader createStructReader(List types, - List> readers, - Types.StructType struct) { - return new RowReader(types, readers, struct); - } + private FlinkParquetReaders() { } - private static class ReadBuilder extends GenericParquetReaders.ReadBuilder { - - private ReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); - } + public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { + return INSTANCE.createReader(expectedSchema, fileSchema); + } - @Override - protected ParquetValueReaders.StructReader createStructReader(List types, - List> readers, - Types.StructType struct) { - return new RowReader(types, readers, struct); - } + @Override + protected ParquetValueReader createStructReader(List types, + List> fieldReaders, + Types.StructType structType) { + return new RowReader(types, fieldReaders, structType); } - static class RowReader extends ParquetValueReaders.StructReader { + private static class RowReader extends ParquetValueReaders.StructReader { private final Types.StructType structType; RowReader(List types, List> readers, Types.StructType struct) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 2e36b4ac624f..54b4fea083a1 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -21,31 +21,25 @@ import java.util.List; import org.apache.flink.types.Row; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.parquet.ParquetTypeVisitor; +import org.apache.iceberg.data.parquet.BaseParquetWriter; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.parquet.schema.MessageType; -public class FlinkParquetWriters extends GenericParquetWriter { +public class FlinkParquetWriters extends BaseParquetWriter { + + private static final FlinkParquetWriters INSTANCE = new FlinkParquetWriters(); + private FlinkParquetWriters() { } - @SuppressWarnings("unchecked") - public static ParquetValueWriter buildRowWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + public static ParquetValueWriter buildWriter(MessageType type) { + return INSTANCE.createWriter(type); } - private static class WriteBuilder extends GenericParquetWriter.WriteBuilder { - - private WriteBuilder(MessageType type) { - super(type); - } - - @Override - protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { - return new RowWriter(writers); - } + @Override + protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { + return new RowWriter(writers); } private static class RowWriter extends ParquetValueWriters.StructWriter { diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java index f8bf6a53e514..e0cb3f05439e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -76,14 +76,14 @@ private void testCorrectness(Schema schema, int numRecords, Iterable iterab try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) .schema(schema) - .createWriterFunc(FlinkParquetWriters::buildRowWriter) + .createWriterFunc(FlinkParquetWriters::buildWriter) .build()) { writer.addAll(iterable); } try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) .project(schema) - .createReaderFunc(type -> FlinkParquetReaders.buildRowReader(schema, type)) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) .build()) { Iterator expected = iterable.iterator(); Iterator rows = reader.iterator();