Skip to content

Commit

Permalink
Parquet: Support vectorized reads with identity partition values (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellSpitzer authored Aug 5, 2020
1 parent a4b457e commit 71de518
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,44 @@ public int numValues() {
return vector.getValueCount();
}

public static <T> VectorHolder constantHolder(int numRows, T constantValue) {
return new ConstantVectorHolder(numRows, constantValue);
}

public static VectorHolder dummyHolder(int numRows) {
return new VectorHolder() {
@Override
public int numValues() {
return numRows;
}
};
return new ConstantVectorHolder(numRows);
}

public boolean isDummy() {
return vector == null;
}

/**
* A Vector Holder which does not actually produce values, consumers of this class should
* use the constantValue to populate their ColumnVector implementation.
*/
public static class ConstantVectorHolder<T> extends VectorHolder {
private final T constantValue;
private final int numRows;

public ConstantVectorHolder(int numRows) {
this.numRows = numRows;
this.constantValue = null;
}

public ConstantVectorHolder(int numRows, T constantValue) {
this.numRows = numRows;
this.constantValue = constantValue;
}

@Override
public int numValues() {
return this.numRows;
}

public Object getConstant() {
return constantValue;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -356,5 +356,36 @@ public String toString() {
public void setBatchSize(int batchSize) {}
}

/**
* A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
* VectorHolder which indicates the constant value which should be used for this column.
* @param <T> The constant value to use
*/
public static class ConstantVectorReader<T> extends VectorizedArrowReader {
private final T value;

public ConstantVectorReader(T value) {
this.value = value;
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.constantHolder(numValsToRead, value);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
}

@Override
public String toString() {
return String.format("ConstantReader: %s", value);
}

@Override
public void setBatchSize(int batchSize) {}

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.Decimal;
Expand Down Expand Up @@ -144,7 +145,8 @@ public ArrowColumnVector getChild(int ordinal) {
}

static ColumnVector forHolder(VectorHolder holder, int numRows) {
return holder.isDummy() ? new ConstantColumnVector(Types.IntegerType.get(), numRows, null) :
return holder.isDummy() ?
new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) :
new IcebergArrowColumnVector(holder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -47,26 +48,36 @@ public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
boolean setArrowValidityVector) {
return buildReader(expectedSchema, fileSchema, setArrowValidityVector, Maps.newHashMap());
}

public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
boolean setArrowValidityVector,
Map<Integer, ?> idToConstant) {
return (ColumnarBatchReader)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector));
new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector, idToConstant));
}

private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
private final MessageType parquetSchema;
private final Schema icebergSchema;
private final BufferAllocator rootAllocator;
private final Map<Integer, ?> idToConstant;
private final boolean setArrowValidityVector;

VectorizedReaderBuilder(
Schema expectedSchema,
MessageType parquetSchema,
boolean setArrowValidityVector) {
boolean setArrowValidityVector, Map<Integer, ?> idToConstant) {
this.parquetSchema = parquetSchema;
this.icebergSchema = expectedSchema;
this.rootAllocator = ArrowAllocation.rootAllocator()
.newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
this.setArrowValidityVector = setArrowValidityVector;
this.idToConstant = idToConstant;
}

@Override
Expand All @@ -90,7 +101,9 @@ public VectorizedReader<?> message(
for (Types.NestedField field : icebergFields) {
int id = field.fieldId();
VectorizedReader<?> reader = readersById.get(id);
if (reader != null) {
if (idToConstant.containsKey(id)) {
reorderedFields.add(new ConstantVectorReader(idToConstant.get(id)));
} else if (reader != null) {
reorderedFields.add(reader);
} else {
reorderedFields.add(VectorizedArrowReader.nulls());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
.project(expectedSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema,
fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED))
fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant))
.recordsPerBatch(batchSize)
.filter(task.residual())
.caseSensitive(caseSensitive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -110,24 +113,52 @@ public static void stopSpark() {
private Table table = null;
private Dataset<Row> logs = null;

@Before
public void setupTable() throws Exception {
/**
* Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
* parquet files. This makes sure that if the identity mapping fails, the test will also fail.
*/
private void setupParquet() throws Exception {
File location = temp.newFolder("logs");
File hiveLocation = temp.newFolder("hive");
String hiveTable = "hivetable";
Assert.assertTrue("Temp folder should exist", location.exists());

Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
.option("path", hiveLocation.toString()).saveAsTable(hiveTable);

this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());

SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
}

logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
@Before
public void setupTable() throws Exception {
if (format.equals("parquet")) {
setupParquet();
} else {
File location = temp.newFolder("logs");
Assert.assertTrue("Temp folder should exist", location.exists());

Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");

logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
}
}

@Test
public void testFullProjection() {
List<Row> expected = logs.orderBy("id").collectAsList();
List<Row> actual = spark.read().format("iceberg")
.option("vectorization-enabled", String.valueOf(vectorized))
.load(table.location()).orderBy("id").collectAsList();
.load(table.location()).orderBy("id")
.select("id", "date", "level", "message")
.collectAsList();
Assert.assertEquals("Rows should match", expected, actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,10 @@ public boolean enableBatchRead() {

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean hasNoIdentityProjections = tasks().stream()
.allMatch(combinedScanTask -> combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.spec().identitySourceIds().isEmpty()));

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

this.readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && hasNoIdentityProjections && onlyPrimitives));
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
}
Expand Down

0 comments on commit 71de518

Please sign in to comment.