Skip to content

Commit

Permalink
Vectorization: Parquet additions to support batch reads (apache#710)
Browse files Browse the repository at this point in the history
Co-authored-by: [email protected]
Co-authored-by: [email protected]
  • Loading branch information
samarthjain authored and rdblue committed Dec 30, 2019
1 parent 44a0a83 commit 3c88ef1
Show file tree
Hide file tree
Showing 8 changed files with 549 additions and 146 deletions.
29 changes: 25 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,14 @@ public static class ReadBuilder {
private Schema schema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
private Map<String, String> properties = Maps.newHashMap();
private boolean callInit = false;
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;

private ReadBuilder(InputFile file) {
this.file = file;
Expand Down Expand Up @@ -358,10 +360,19 @@ public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
}

public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
Preconditions.checkArgument(this.batchedReaderFunc == null,
"Reader function cannot be set since the batched version is already set");
this.readerFunc = newReaderFunction;
return this;
}

public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
Preconditions.checkArgument(this.readerFunc == null,
"Batched reader function cannot be set since the non-batched version is already set");
this.batchedReaderFunc = func;
return this;
}

public ReadBuilder set(String key, String value) {
properties.put(key, value);
return this;
Expand All @@ -377,9 +388,14 @@ public ReadBuilder reuseContainers() {
return this;
}

@SuppressWarnings("unchecked")
public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
this.maxRecordsPerBatch = numRowsPerBatch;
return this;
}

@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
if (readerFunc != null) {
if (readerFunc != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
Expand All @@ -402,8 +418,13 @@ public <D> CloseableIterable<D> build() {

ParquetReadOptions options = optionsBuilder.build();

return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
if (batchedReaderFunc != null) {
return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
caseSensitive, maxRecordsPerBatch);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
}
}

ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -40,8 +39,6 @@
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -114,7 +111,7 @@ private boolean eval(MessageType fileSchema, BlockMetaData rowGroup,
PrimitiveType colType = fileSchema.getType(meta.getPath().toArray()).asPrimitiveType();
if (colType.getId() != null) {
int id = colType.getId().intValue();
isFallback.put(id, hasNonDictionaryPages(meta));
isFallback.put(id, ParquetUtil.hasNonDictionaryPages(meta));
mayContainNulls.put(id, mayContainNull(meta));
}
}
Expand Down Expand Up @@ -385,31 +382,4 @@ private static boolean mayContainNull(ColumnChunkMetaData meta) {
return meta.getStatistics() == null || meta.getStatistics().getNumNulls() != 0;
}

@SuppressWarnings("deprecation")
private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
EncodingStats stats = meta.getEncodingStats();
if (stats != null) {
return stats.hasNonDictionaryEncodedPages();
}

// without EncodingStats, fall back to testing the encoding list
Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
// if remove returned true, PLAIN_DICTIONARY was present, which means at
// least one page was dictionary encoded and 1.0 encodings are used

// RLE and BIT_PACKED are only used for repetition or definition levels
encodings.remove(Encoding.RLE);
encodings.remove(Encoding.BIT_PACKED);

// when empty, no encodings other than dictionary or rep/def levels
return !encodings.isEmpty();
} else {
// if PLAIN_DICTIONARY wasn't present, then either the column is not
// dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
// for 2.0, this cannot determine whether a page fell back without
// page encoding stats
return true;
}
}
}
111 changes: 1 addition & 110 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -34,7 +33,6 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;

public class ParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
Expand All @@ -59,122 +57,15 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions
this.caseSensitive = caseSensitive;
}

private static class ReadConf<T> {
private final ParquetFileReader reader;
private final InputFile file;
private final ParquetReadOptions options;
private final MessageType projection;
private final ParquetValueReader<T> model;
private final List<BlockMetaData> rowGroups;
private final boolean[] shouldSkip;
private final long totalValues;
private final boolean reuseContainers;

@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
boolean caseSensitive) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);

MessageType fileSchema = reader.getFileMetaData().getSchema();

boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
MessageType typeWithIds = hasIds ? fileSchema : ParquetSchemaUtil.addFallbackIds(fileSchema);

this.projection = hasIds ?
ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
this.model = (ParquetValueReader<T>) readerFunc.apply(typeWithIds);
this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
}

long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
boolean shouldRead = filter == null || (
statsFilter.shouldRead(typeWithIds, rowGroup) &&
dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
computedTotalValues += rowGroup.getRowCount();
}
}

this.totalValues = computedTotalValues;
this.reuseContainers = reuseContainers;
}

ReadConf(ReadConf<T> toCopy) {
this.reader = null;
this.file = toCopy.file;
this.options = toCopy.options;
this.projection = toCopy.projection;
this.model = toCopy.model;
this.rowGroups = toCopy.rowGroups;
this.shouldSkip = toCopy.shouldSkip;
this.totalValues = toCopy.totalValues;
this.reuseContainers = toCopy.reuseContainers;
}

ParquetFileReader reader() {
if (reader != null) {
reader.setRequestedSchema(projection);
return reader;
}

ParquetFileReader newReader = newReader(file, options);
newReader.setRequestedSchema(projection);
return newReader;
}

ParquetValueReader<T> model() {
return model;
}

boolean[] shouldSkip() {
return shouldSkip;
}

long totalValues() {
return totalValues;
}

boolean reuseContainers() {
return reuseContainers;
}

ReadConf<T> copy() {
return new ReadConf<>(this);
}

private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) {
try {
return ParquetFileReader.open(ParquetIO.file(file), options);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to open Parquet file: %s", file.location());
}
}
}

private ReadConf<T> conf = null;

private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> readConf = new ReadConf<>(
input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
input, options, expectedSchema, filter, readerFunc, null, reuseContainers, caseSensitive, null);
this.conf = readConf.copy();
return readConf;
}

return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +43,8 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.UnicodeUtil;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -127,7 +130,6 @@ public static Metrics footerMetrics(ParquetMetadata metadata, MetricsConfig metr
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
}


/**
* @return a list of offsets in ascending order determined by the starting position
* of the row groups
Expand Down Expand Up @@ -225,4 +227,32 @@ private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer,
}
return bufferMap;
}

@SuppressWarnings("deprecation")
public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
EncodingStats stats = meta.getEncodingStats();
if (stats != null) {
return stats.hasNonDictionaryEncodedPages();
}

// without EncodingStats, fall back to testing the encoding list
Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
// if remove returned true, PLAIN_DICTIONARY was present, which means at
// least one page was dictionary encoded and 1.0 encodings are used

// RLE and BIT_PACKED are only used for repetition or definition levels
encodings.remove(Encoding.RLE);
encodings.remove(Encoding.BIT_PACKED);

// when empty, no encodings other than dictionary or rep/def levels
return !encodings.isEmpty();
} else {
// if PLAIN_DICTIONARY wasn't present, then either the column is not
// dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
// for 2.0, this cannot determine whether a page fell back without
// page encoding stats
return true;
}
}
}
Loading

0 comments on commit 3c88ef1

Please sign in to comment.