From 71de51805be7954dd423ec29fd4c7d6b303adc1f Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Tue, 4 Aug 2020 20:08:03 -0500 Subject: [PATCH] Parquet: Support vectorized reads with identity partition values (#1287) --- .../arrow/vectorized/VectorHolder.java | 39 +++++++++++++++--- .../vectorized/VectorizedArrowReader.java | 31 ++++++++++++++ .../vectorized/IcebergArrowColumnVector.java | 4 +- .../VectorizedSparkParquetReaders.java | 19 +++++++-- .../iceberg/spark/source/BatchDataReader.java | 2 +- .../source/TestIdentityPartitionData.java | 41 ++++++++++++++++--- .../apache/iceberg/spark/source/Reader.java | 7 +--- 7 files changed, 121 insertions(+), 22 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java index b938d3845c19..14062ff6558f 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java @@ -91,17 +91,44 @@ public int numValues() { return vector.getValueCount(); } + public static VectorHolder constantHolder(int numRows, T constantValue) { + return new ConstantVectorHolder(numRows, constantValue); + } + public static VectorHolder dummyHolder(int numRows) { - return new VectorHolder() { - @Override - public int numValues() { - return numRows; - } - }; + return new ConstantVectorHolder(numRows); } public boolean isDummy() { return vector == null; } + /** + * A Vector Holder which does not actually produce values, consumers of this class should + * use the constantValue to populate their ColumnVector implementation. + */ + public static class ConstantVectorHolder extends VectorHolder { + private final T constantValue; + private final int numRows; + + public ConstantVectorHolder(int numRows) { + this.numRows = numRows; + this.constantValue = null; + } + + public ConstantVectorHolder(int numRows, T constantValue) { + this.numRows = numRows; + this.constantValue = constantValue; + } + + @Override + public int numValues() { + return this.numRows; + } + + public Object getConstant() { + return constantValue; + } + } + } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index dbde001b9764..126388a02f90 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -356,5 +356,36 @@ public String toString() { public void setBatchSize(int batchSize) {} } + /** + * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy + * VectorHolder which indicates the constant value which should be used for this column. + * @param The constant value to use + */ + public static class ConstantVectorReader extends VectorizedArrowReader { + private final T value; + + public ConstantVectorReader(T value) { + this.value = value; + } + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + return VectorHolder.constantHolder(numValsToRead, value); + } + + @Override + public void setRowGroupInfo(PageReadStore source, Map metadata) { + } + + @Override + public String toString() { + return String.format("ConstantReader: %s", value); + } + + @Override + public void setBatchSize(int batchSize) {} + + } + } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java index 60cd17e06ed3..9b973f4002d4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java @@ -21,6 +21,7 @@ import org.apache.iceberg.arrow.vectorized.NullabilityHolder; import org.apache.iceberg.arrow.vectorized.VectorHolder; +import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.Decimal; @@ -144,7 +145,8 @@ public ArrowColumnVector getChild(int ordinal) { } static ColumnVector forHolder(VectorHolder holder, int numRows) { - return holder.isDummy() ? new ConstantColumnVector(Types.IntegerType.get(), numRows, null) : + return holder.isDummy() ? + new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : new IcebergArrowColumnVector(holder); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 3eb55ebaf22a..2834135aa3e2 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; +import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -47,26 +48,36 @@ public static ColumnarBatchReader buildReader( Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) { + return buildReader(expectedSchema, fileSchema, setArrowValidityVector, Maps.newHashMap()); + } + + public static ColumnarBatchReader buildReader( + Schema expectedSchema, + MessageType fileSchema, + boolean setArrowValidityVector, + Map idToConstant) { return (ColumnarBatchReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector)); + new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector, idToConstant)); } private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor> { private final MessageType parquetSchema; private final Schema icebergSchema; private final BufferAllocator rootAllocator; + private final Map idToConstant; private final boolean setArrowValidityVector; VectorizedReaderBuilder( Schema expectedSchema, MessageType parquetSchema, - boolean setArrowValidityVector) { + boolean setArrowValidityVector, Map idToConstant) { this.parquetSchema = parquetSchema; this.icebergSchema = expectedSchema; this.rootAllocator = ArrowAllocation.rootAllocator() .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE); this.setArrowValidityVector = setArrowValidityVector; + this.idToConstant = idToConstant; } @Override @@ -90,7 +101,9 @@ public VectorizedReader message( for (Types.NestedField field : icebergFields) { int id = field.fieldId(); VectorizedReader reader = readersById.get(id); - if (reader != null) { + if (idToConstant.containsKey(id)) { + reorderedFields.add(new ConstantVectorReader(idToConstant.get(id))); + } else if (reader != null) { reorderedFields.add(reader); } else { reorderedFields.add(VectorizedArrowReader.nulls()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index eff18ca3100d..82f889ecb490 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -89,7 +89,7 @@ CloseableIterator open(FileScanTask task) { .project(expectedSchema) .split(task.start(), task.length()) .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema, - fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED)) + fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java index 15ae2f5f3176..755e4795d9a6 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java @@ -31,10 +31,13 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -110,16 +113,42 @@ public static void stopSpark() { private Table table = null; private Dataset logs = null; - @Before - public void setupTable() throws Exception { + /** + * Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying + * parquet files. This makes sure that if the identity mapping fails, the test will also fail. + */ + private void setupParquet() throws Exception { File location = temp.newFolder("logs"); + File hiveLocation = temp.newFolder("hive"); + String hiveTable = "hivetable"; Assert.assertTrue("Temp folder should exist", location.exists()); Map properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format); - this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString()); this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message"); + spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable)); + logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet") + .option("path", hiveLocation.toString()).saveAsTable(hiveTable); + + this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable), + SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString()); + + SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString()); + } - logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString()); + @Before + public void setupTable() throws Exception { + if (format.equals("parquet")) { + setupParquet(); + } else { + File location = temp.newFolder("logs"); + Assert.assertTrue("Temp folder should exist", location.exists()); + + Map properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format); + this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString()); + this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message"); + + logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString()); + } } @Test @@ -127,7 +156,9 @@ public void testFullProjection() { List expected = logs.orderBy("id").collectAsList(); List actual = spark.read().format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) - .load(table.location()).orderBy("id").collectAsList(); + .load(table.location()).orderBy("id") + .select("id", "date", "level", "message") + .collectAsList(); Assert.assertEquals("Rows should match", expected, actual); } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 35b137c08a27..232acc614eeb 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -315,15 +315,10 @@ public boolean enableBatchRead() { boolean atLeastOneColumn = lazySchema().columns().size() > 0; - boolean hasNoIdentityProjections = tasks().stream() - .allMatch(combinedScanTask -> combinedScanTask.files() - .stream() - .allMatch(fileScanTask -> fileScanTask.spec().identitySourceIds().isEmpty())); - boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType()); this.readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks || - (allParquetFileScanTasks && atLeastOneColumn && hasNoIdentityProjections && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); } return readUsingBatch; }