Skip to content

Commit

Permalink
Use constant readers for Pig partition values. (apache#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored and danielcweeks committed Sep 4, 2019
1 parent 32ac506 commit d158818
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public static <T> ParquetValueReader<T> nulls() {
return (ParquetValueReader<T>) NullReader.INSTANCE;
}

public static <C> ParquetValueReader<C> constant(C value) {
return new ConstantReader<>(value);
}

private static class NullReader<T> implements ParquetValueReader<T> {
private static final NullReader<Void> INSTANCE = new NullReader<>();
private static final List<TripleIterator<?>> COLUMNS = ImmutableList.of();
Expand Down Expand Up @@ -106,6 +110,33 @@ public void setPageSource(PageReadStore pageStore) {
}
}

static class ConstantReader<C> implements ParquetValueReader<C> {
private final C constantValue;

ConstantReader(C constantValue) {
this.constantValue = constantValue;
}

@Override
public C read(C reuse) {
return constantValue;
}

@Override
public TripleIterator<?> column() {
return NullReader.NULL_COLUMN;
}

@Override
public List<TripleIterator<?>> columns() {
return NullReader.COLUMNS;
}

@Override
public void setPageSource(PageReadStore pageStore) {
}
}

public abstract static class PrimitiveReader<T> implements ParquetValueReader<T> {
private final ColumnDescriptor desc;
protected final ColumnIterator<?> column;
Expand Down Expand Up @@ -604,11 +635,7 @@ protected StructReader(List<Type> types, List<ParquetValueReader<?>> readers) {
}

this.children = columnsBuilder.build();
if (children.size() > 0) {
this.column = children.get(0);
} else {
this.column = NullReader.NULL_COLUMN;
}
this.column = firstNonNullColumn(children);
}

@Override
Expand Down Expand Up @@ -719,5 +746,20 @@ protected void setFloat(I struct, int pos, float value) {
protected void setDouble(I struct, int pos, double value) {
set(struct, pos, value);
}

/**
* Find a non-null column or return NULL_COLUMN if one is not available.
*
* @param columns a collection of triple iterator columns
* @return the first non-null column in columns
*/
private TripleIterator<?> firstNonNullColumn(List<TripleIterator<?>> columns) {
for (TripleIterator<?> column : columns) {
if (column != NullReader.NULL_COLUMN) {
return column;
}
}
return NullReader.NULL_COLUMN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,17 @@ private boolean advance() throws IOException {
}

for (Types.NestedField field : projectedPartitionSchema.columns()) {
int tupleIndex = projectedSchema.columns().indexOf(field);
int partitionIndex = partitionSpecFieldIndexMap.get(field.name());

Object partitionValue = file.partition().get(partitionIndex, Object.class);
partitionValueMap.put(tupleIndex, convertPartitionValue(field.type(), partitionValue));
partitionValueMap.put(field.fieldId(), convertPartitionValue(field.type(), partitionValue));
}

reader = Parquet.read(inputFile)
.project(readSchema)
.split(currentTask.start(), currentTask.length())
.filter(currentTask.residual())
.createReaderFunc(fileSchema -> PigParquetReader.buildReader(fileSchema, readSchema, partitionValueMap))
.createReaderFunc(fileSchema -> PigParquetReader.buildReader(fileSchema, projectedSchema, partitionValueMap))
.build();
} else {
reader = Parquet.read(inputFile)
Expand Down
47 changes: 19 additions & 28 deletions pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,11 @@
import org.apache.pig.data.TupleFactory;

import static java.lang.String.format;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.hasIds;
import static org.apache.iceberg.parquet.ParquetValueReaders.option;

public class PigParquetReader {
private final ParquetValueReader reader;

public PigParquetReader(Schema readSchema, MessageType fileSchema, Map<Integer, Object> partitionValues) {
this.reader = buildReader(convert(readSchema, fileSchema.getName()), readSchema, partitionValues);
private PigParquetReader() {
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -112,7 +108,7 @@ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType struct,
types.add(fieldType);
}

return new TupleReader(types, newFields, partitionValues);
return new TupleReader(types, newFields);
}
}

Expand Down Expand Up @@ -151,17 +147,23 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
if (partitionValues.containsKey(id)) {
// the value may be null so containsKey is used to check for a partition value
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
}
}

return new TupleReader(types, reorderedFields, partitionValues);
return new TupleReader(types, reorderedFields);
}

@Override
Expand Down Expand Up @@ -394,19 +396,16 @@ protected DataBag buildList(DataBag bag) {

private static class TupleReader extends StructReader<Tuple, Tuple> {
private static final TupleFactory TF = TupleFactory.getInstance();
private final Map<Integer, Object> partitionValues;
private final int columns;
private final int numColumns;

protected TupleReader(List<Type> types, List<ParquetValueReader<?>> readers, Map<Integer, Object> partitionValues) {
TupleReader(List<Type> types, List<ParquetValueReader<?>> readers) {
super(types, readers);

this.partitionValues = partitionValues;
this.columns = types.size() + partitionValues.size();
this.numColumns = readers.size();
}

@Override
protected Tuple newStructData(Tuple reuse) {
return TF.newTuple(columns);
return TF.newTuple(numColumns);
}

@Override
Expand All @@ -416,14 +415,6 @@ protected Object getField(Tuple tuple, int pos) {

@Override
protected Tuple buildStruct(Tuple tuple) {
for (Map.Entry<Integer, Object> e : partitionValues.entrySet()) {
try {
tuple.set(e.getKey(), e.getValue());
} catch (ExecException ex) {
throw new RuntimeException("Error setting value for key" + e.getKey(), ex);
}
}

return tuple;
}

Expand Down

0 comments on commit d158818

Please sign in to comment.