diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index f1a65f6202fd..2e7cdbf5d355 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -628,9 +628,7 @@ protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, L Map nanValueCounts = metrics.nanValueCounts(); Assert.assertEquals(valueCount, valueCounts.get(fieldId)); Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId)); - if (fileFormat() != FileFormat.ORC) { - Assert.assertEquals(nanValueCount, nanValueCounts.get(fieldId)); - } + Assert.assertEquals(nanValueCount, nanValueCounts.get(fieldId)); } protected void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index cf7082cd035f..a04d777df1d6 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,8 +20,11 @@ package org.apache.iceberg.data.orc; import java.util.List; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueWriter; @@ -79,9 +82,9 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case LONG: return GenericOrcWriters.longs(); case FLOAT: - return GenericOrcWriters.floats(); + return GenericOrcWriters.floats(ORCSchemaUtil.fieldId(primitive)); case DOUBLE: - return GenericOrcWriters.doubles(); + return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case DATE: return GenericOrcWriters.dates(); case TIME: @@ -125,6 +128,11 @@ public void write(Record value, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class RecordWriter implements OrcValueWriter { private final List> writers; @@ -150,5 +158,10 @@ public void nonNullWrite(int rowId, Record data, ColumnVector output) { child.write(rowId, data.get(c, child.getJavaClass()), cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 00e9121ac58b..e4eff7885ab3 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -32,6 +32,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.FloatFieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -77,12 +80,12 @@ public static OrcValueWriter longs() { return LongWriter.INSTANCE; } - public static OrcValueWriter floats() { - return FloatWriter.INSTANCE; + public static OrcValueWriter floats(int id) { + return new FloatWriter(id); } - public static OrcValueWriter doubles() { - return DoubleWriter.INSTANCE; + public static OrcValueWriter doubles(int id) { + return new DoubleWriter(id); } public static OrcValueWriter strings() { @@ -216,7 +219,13 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } private static class FloatWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new FloatWriter(); + private final int id; + private long nanCount; + + private FloatWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public Class getJavaClass() { @@ -226,11 +235,25 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Float data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; + if (Float.isNaN(data)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } private static class DoubleWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new DoubleWriter(); + private final int id; + private long nanCount; + + private DoubleWriter(Integer id) { + this.id = id; + this.nanCount = 0; + } @Override public Class getJavaClass() { @@ -240,6 +263,14 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Double data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; + if (Double.isNaN(data)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } @@ -436,6 +467,11 @@ public void nonNullWrite(int rowId, List value, ColumnVector output) { element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child); } } + + @Override + public Stream metrics() { + return element.metrics(); + } } private static class MapWriter implements OrcValueWriter> { @@ -475,5 +511,10 @@ public void nonNullWrite(int rowId, Map map, ColumnVector output) { valueWriter.write(pos, values.get(e), cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } } diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index 96217229d879..fa588302e8e2 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -90,7 +90,7 @@ public abstract class TestMergingMetrics { @Parameterized.Parameters(name = "fileFormat = {0}") public static Object[] parameters() { - return new Object[] {FileFormat.PARQUET }; + return new Object[] { FileFormat.PARQUET, FileFormat.ORC }; } public TestMergingMetrics(FileFormat fileFormat) { diff --git a/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java similarity index 92% rename from data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java rename to data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java index e7181feadaf5..72db36a9908c 100644 --- a/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java @@ -17,10 +17,12 @@ * under the License. */ -package org.apache.iceberg; +package org.apache.iceberg.parquet; import java.io.IOException; import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 592307ded257..9aff0c127449 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -19,14 +19,18 @@ package org.apache.iceberg.flink.data; +import java.util.Deque; import java.util.List; +import java.util.stream.Stream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -63,10 +67,27 @@ public void write(RowData row, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends FlinkSchemaVisitor> { + private final Deque fieldIds = Lists.newLinkedList(); + private WriteBuilder() { } + @Override + public void beforeField(Types.NestedField field) { + fieldIds.push(field.fieldId()); + } + + @Override + public void afterField(Types.NestedField field) { + fieldIds.pop(); + } + @Override public OrcValueWriter record(Types.StructType iStruct, List> results, @@ -101,9 +122,15 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, LogicalType fl case LONG: return GenericOrcWriters.longs(); case FLOAT: - return GenericOrcWriters.floats(); + Preconditions.checkArgument(fieldIds.peek() != null, + String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id " + + "information is not properly pushed during schema visiting.", iPrimitive)); + return GenericOrcWriters.floats(fieldIds.peek()); case DOUBLE: - return GenericOrcWriters.doubles(); + Preconditions.checkArgument(fieldIds.peek() != null, + String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id " + + "information is not properly pushed during schema visiting.", iPrimitive)); + return GenericOrcWriters.doubles(fieldIds.peek()); case DATE: return FlinkOrcWriters.dates(); case TIME: diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 800ad2071274..f088cb51d982 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -23,6 +23,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.stream.Stream; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; @@ -30,6 +31,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -254,6 +256,12 @@ public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { elementWriter.write((int) (e + cv.offsets[rowId]), (T) value, cv.child); } } + + @Override + public Stream metrics() { + return elementWriter.metrics(); + } + } static class MapWriter implements OrcValueWriter { @@ -296,6 +304,11 @@ public void nonNullWrite(int rowId, MapData data, ColumnVector output) { valueWriter.write(pos, (V) valueGetter.getElementOrNull(valArray, e), cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } static class StructWriter implements OrcValueWriter { @@ -329,5 +342,10 @@ public void nonNullWrite(int rowId, RowData data, ColumnVector output) { writer.write(rowId, fieldGetters.get(c).getFieldOrNull(data), cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java index 363d2bde4918..0909e1b53a85 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java @@ -44,17 +44,39 @@ private static T visit(LogicalType flinkType, Type iType, FlinkSchemaVisitor case MAP: MapType mapType = (MapType) flinkType; Types.MapType iMapType = iType.asMapType(); - - T key = visit(mapType.getKeyType(), iMapType.keyType(), visitor); - T value = visit(mapType.getValueType(), iMapType.valueType(), visitor); + T key; + T value; + + Types.NestedField keyField = iMapType.field(iMapType.keyId()); + visitor.beforeMapKey(keyField); + try { + key = visit(mapType.getKeyType(), iMapType.keyType(), visitor); + } finally { + visitor.afterMapKey(keyField); + } + + Types.NestedField valueField = iMapType.field(iMapType.valueId()); + visitor.beforeMapValue(valueField); + try { + value = visit(mapType.getValueType(), iMapType.valueType(), visitor); + } finally { + visitor.afterMapValue(valueField); + } return visitor.map(iMapType, key, value, mapType.getKeyType(), mapType.getValueType()); case LIST: ArrayType listType = (ArrayType) flinkType; Types.ListType iListType = iType.asListType(); + T element; - T element = visit(listType.getElementType(), iListType.elementType(), visitor); + Types.NestedField elementField = iListType.field(iListType.elementId()); + visitor.beforeListElement(elementField); + try { + element = visit(listType.getElementType(), iListType.elementType(), visitor); + } finally { + visitor.afterListElement(elementField); + } return visitor.list(iListType, element, listType.getElementType()); @@ -82,7 +104,13 @@ private static T visitRecord(LogicalType flinkType, Types.StructType struct, LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); fieldTypes.add(fieldFlinkType); - results.add(visit(fieldFlinkType, iField.type(), visitor)); + + visitor.beforeField(iField); + try { + results.add(visit(fieldFlinkType, iField.type(), visitor)); + } finally { + visitor.afterField(iField); + } } return visitor.record(struct, results, fieldTypes); @@ -103,4 +131,34 @@ public T map(Types.MapType iMap, T key, T value, LogicalType keyType, LogicalTyp public T primitive(Type.PrimitiveType iPrimitive, LogicalType flinkPrimitive) { return null; } + + public void beforeField(Types.NestedField field) { + } + + public void afterField(Types.NestedField field) { + } + + public void beforeListElement(Types.NestedField elementField) { + beforeField(elementField); + } + + public void afterListElement(Types.NestedField elementField) { + afterField(elementField); + } + + public void beforeMapKey(Types.NestedField keyField) { + beforeField(keyField); + } + + public void afterMapKey(Types.NestedField keyField) { + afterField(keyField); + } + + public void beforeMapValue(Types.NestedField valueField) { + beforeField(valueField); + } + + public void afterMapValue(Types.NestedField valueField) { + afterField(valueField); + } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index b7496d1e914c..af1d2cf66a41 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -387,7 +387,7 @@ static Optional icebergID(TypeDescription orcType) { .map(Integer::parseInt); } - static int fieldId(TypeDescription orcType) { + public static int fieldId(TypeDescription orcType) { String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); return Integer.parseInt(idStr); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 34f71fcc232e..e946cda3f3a8 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -94,7 +94,7 @@ public void add(D datum) { public Metrics metrics() { Preconditions.checkState(isClosed, "Cannot return metrics while appending to an open file."); - return OrcMetrics.fromWriter(writer, metricsConfig); + return OrcMetrics.fromWriter(writer, valueWriter.metrics(), metricsConfig); } @Override diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index aacd621cd3bb..67b608441399 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -27,11 +27,14 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes; import org.apache.iceberg.MetricsModes.MetricsMode; +import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; @@ -83,22 +86,25 @@ public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig, static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig, NameMapping mapping) { try (Reader orcReader = ORC.newFileReader(file, config)) { return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics(), - metricsConfig, mapping); + Stream.empty(), metricsConfig, mapping); } catch (IOException ioe) { throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location()); } } - static Metrics fromWriter(Writer writer, MetricsConfig metricsConfig) { + static Metrics fromWriter(Writer writer, Stream fieldMetricsStream, MetricsConfig metricsConfig) { try { - return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig, null); + return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), + fieldMetricsStream, metricsConfig, null); } catch (IOException ioe) { throw new RuntimeIOException(ioe, "Failed to get statistics from writer"); } } private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema, - final ColumnStatistics[] colStats, final MetricsConfig metricsConfig, + final ColumnStatistics[] colStats, + final Stream fieldMetricsStream, + final MetricsConfig metricsConfig, final NameMapping mapping) { final TypeDescription orcSchemaWithIds = (!ORCSchemaUtil.hasIds(orcSchema) && mapping != null) ? ORCSchemaUtil.applyNameMapping(orcSchema, mapping) : orcSchema; @@ -115,6 +121,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti valueCounts, nullCounts, null, + null, null); } @@ -167,6 +174,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti columnSizes, valueCounts, nullCounts, + MetricsUtil.createNanValueCounts(fieldMetricsStream, effectiveMetricsConfig, schema), lowerBounds, upperBounds); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java index df494b9cc3e1..413634e3e100 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -20,6 +20,8 @@ package org.apache.iceberg.orc; import java.io.IOException; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; /** @@ -35,4 +37,11 @@ public interface OrcRowWriter { * @throws IOException if there's any IO error while writing the data value. */ void write(T row, VectorizedRowBatch output) throws IOException; + + /** + * Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. + */ + default Stream metrics() { + return Stream.empty(); + } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 9bbc1ddc6f0c..2f72fc20e053 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.orc; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.ColumnVector; public interface OrcValueWriter { @@ -43,4 +45,11 @@ default void write(int rowId, T data, ColumnVector output) { } void nonNullWrite(int rowId, T data, ColumnVector output); + + /** + * Returns a stream of {@link FieldMetrics} that this OrcValueWriter keeps track of. + */ + default Stream metrics() { + return Stream.empty(); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java index 0c94555c9cd6..7692ee58028d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java @@ -33,10 +33,6 @@ public interface ParquetValueWriter { /** * Returns a stream of {@link FieldMetrics} that this ParquetValueWriter keeps track of. - *

- * Since Parquet keeps track of most metrics in its footer, for now ParquetValueWriter only keeps track of NaN - * counter, and only return non-empty stream if the writer writes double or float values either by itself or - * transitively. */ default Stream metrics() { return Stream.empty(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java index ef0ccf28e95f..131a93d6f9d9 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; @@ -43,4 +45,15 @@ default void write(int rowId, int column, SpecializedGetters data, ColumnVector } void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output); + + /** + * Returns a stream of {@link FieldMetrics} that this SparkOrcValueWriter keeps track of. + *

+ * Since ORC keeps track of most metrics via column statistics, for now SparkOrcValueWriter only keeps track of NaN + * counters, and only return non-empty stream if the writer writes double or float values either by itself or + * transitively. + */ + default Stream metrics() { + return Stream.empty(); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 9d517983e437..78e013e13661 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -19,6 +19,9 @@ package org.apache.iceberg.spark.data; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.FloatFieldMetrics; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -56,12 +59,12 @@ static SparkOrcValueWriter longs() { return LongWriter.INSTANCE; } - static SparkOrcValueWriter floats() { - return FloatWriter.INSTANCE; + static SparkOrcValueWriter floats(int id) { + return new FloatWriter(id); } - static SparkOrcValueWriter doubles() { - return DoubleWriter.INSTANCE; + static SparkOrcValueWriter doubles(int id) { + return new DoubleWriter(id); } static SparkOrcValueWriter byteArrays() { @@ -138,20 +141,52 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } private static class FloatWriter implements SparkOrcValueWriter { - private static final FloatWriter INSTANCE = new FloatWriter(); + private final int id; + private long nanCount; + + private FloatWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column); + float floatValue = data.getFloat(column); + ((DoubleColumnVector) output).vector[rowId] = floatValue; + + if (Float.isNaN(floatValue)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } private static class DoubleWriter implements SparkOrcValueWriter { - private static final DoubleWriter INSTANCE = new DoubleWriter(); + private final int id; + private long nanCount; + + private DoubleWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column); + double doubleValue = data.getDouble(column); + ((DoubleColumnVector) output).vector[rowId] = doubleValue; + + if (Double.isNaN(doubleValue)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } @@ -244,6 +279,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child); } } + + @Override + public Stream metrics() { + return writer.metrics(); + } } private static class MapWriter implements SparkOrcValueWriter { @@ -275,5 +315,10 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV valueWriter.write(pos, e, value, cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 4508a102d447..ce1b2bec0ec1 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,7 +20,10 @@ package org.apache.iceberg.spark.data; import java.util.List; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -61,6 +64,11 @@ public void write(InternalRow value, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends OrcSchemaWithTypeVisitor { private WriteBuilder() { } @@ -98,9 +106,9 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript case LONG: return SparkOrcValueWriters.longs(); case FLOAT: - return SparkOrcValueWriters.floats(); + return SparkOrcValueWriters.floats(ORCSchemaUtil.fieldId(primitive)); case DOUBLE: - return SparkOrcValueWriters.doubles(); + return SparkOrcValueWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case BINARY: return SparkOrcValueWriters.byteArrays(); case STRING: @@ -136,5 +144,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV writers.get(c).write(rowId, c, value, cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(SparkOrcValueWriter::metrics); + } + } }