Skip to content

Commit

Permalink
Parquet: Fix vectorized reads for negative decimals (apache#1736)
Browse files Browse the repository at this point in the history
  • Loading branch information
samarthjain authored Nov 6, 2020
1 parent 59fb458 commit 1bebf1a
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 57 deletions.
24 changes: 15 additions & 9 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class RandomUtil {
private RandomUtil() {
}

private static boolean negate(int num) {
return num % 2 == 1;
}

@SuppressWarnings("RandomModInteger")
public static Object generatePrimitive(Type.PrimitiveType primitive,
Random random) {
Expand All @@ -49,7 +53,7 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
case 3:
return 0;
default:
return random.nextInt();
return negate(choice) ? -1 * random.nextInt() : random.nextInt();
}

case LONG:
Expand All @@ -61,7 +65,7 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
case 3:
return 0L;
default:
return random.nextLong();
return negate(choice) ? -1L * random.nextLong() : random.nextLong();
}

case FLOAT:
Expand All @@ -83,7 +87,7 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
case 8:
return Float.NaN;
default:
return random.nextFloat();
return negate(choice) ? -1.0F * random.nextFloat() : random.nextFloat();
}

case DOUBLE:
Expand All @@ -105,7 +109,7 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
case 8:
return Double.NaN;
default:
return random.nextDouble();
return negate(choice) ? -1.0D * random.nextDouble() : random.nextDouble();
}

case DATE:
Expand Down Expand Up @@ -140,7 +144,8 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
case DECIMAL:
Types.DecimalType type = (Types.DecimalType) primitive;
BigInteger unscaled = randomUnscaled(type.precision(), random);
return new BigDecimal(unscaled, type.scale());
BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale());
return negate(choice) ? bigDecimal.negate() : bigDecimal;

default:
throw new IllegalArgumentException(
Expand All @@ -155,11 +160,11 @@ public static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType pri
return true; // doesn't really matter for booleans since they are not dictionary encoded
case INTEGER:
case DATE:
return value;
return negate(value) ? -1 * value : value;
case FLOAT:
return (float) value;
return negate(value) ? -1.0F * (float) value : (float) value;
case DOUBLE:
return (double) value;
return negate(value) ? -1.0D * (double) value : (double) value;
case LONG:
case TIME:
case TIMESTAMP:
Expand All @@ -177,7 +182,8 @@ public static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType pri
case DECIMAL:
Types.DecimalType type = (Types.DecimalType) primitive;
BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
return new BigDecimal(unscaled, type.scale());
BigDecimal bd = new BigDecimal(unscaled, type.scale());
return negate(value) ? bd.negate() : bd;
default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private VectorizedArrowReader() {

private enum ReadType {
FIXED_LENGTH_DECIMAL,
INT_LONG_BACKED_DECIMAL,
INT_BACKED_DECIMAL,
LONG_BACKED_DECIMAL,
VARCHAR,
VARBINARY,
FIXED_WIDTH_BINARY,
Expand Down Expand Up @@ -130,8 +131,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
case FIXED_LENGTH_DECIMAL:
vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
break;
case INT_LONG_BACKED_DECIMAL:
vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
case INT_BACKED_DECIMAL:
vectorizedColumnIterator.nextBatchIntBackedDecimal(vec, nullabilityHolder);
break;
case LONG_BACKED_DECIMAL:
vectorizedColumnIterator.nextBatchLongBackedDecimal(vec, nullabilityHolder);
break;
case VARBINARY:
vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
Expand Down Expand Up @@ -237,11 +241,11 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
this.typeWidth = primitive.getTypeLength();
break;
case INT64:
this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
this.readType = ReadType.LONG_BACKED_DECIMAL;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case INT32:
this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
this.readType = ReadType.INT_BACKED_DECIMAL;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,30 @@ public void nextBatchDoubles(FieldVector fieldVector, int typeWidth, Nullability
}
}

public void nextBatchIntLongBackedDecimal(
public void nextBatchIntBackedDecimal(
FieldVector fieldVector,
int typeWidth,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
vectorizedPageIterator.nextBatchIntBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}

public void nextBatchLongBackedDecimal(
FieldVector fieldVector,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ void readBatchOfDictionaryEncodedFixedLengthDecimals(FieldVector vector, int typ
case RLE:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
Expand All @@ -296,8 +296,8 @@ void readBatchOfDictionaryEncodedFixedLengthDecimals(FieldVector vector, int typ
case PACKED:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
Expand Down Expand Up @@ -343,7 +343,7 @@ void readBatchOfDictionaryEncodedVarWidthBinary(FieldVector vector, int startOff
}
}

void readBatchOfDictionaryEncodedIntLongBackedDecimals(FieldVector vector, int typeWidth, int startOffset,
void readBatchOfDictionaryEncodedIntBackedDecimals(FieldVector vector, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
Expand All @@ -358,18 +358,49 @@ void readBatchOfDictionaryEncodedIntLongBackedDecimals(FieldVector vector, int t
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
dict.decodeToInt(currentValue));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
typeWidth == Integer.BYTES ?
dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++])
: dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}

void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
dict.decodeToLong(currentValue));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
nullabilityHolder.setNotNull(idx);
idx++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,28 +274,26 @@ private int getActualBatchSize(int expectedBatchSize) {
* Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
* decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
*/
public int nextBatchIntLongBackedDecimal(
public int nextBatchIntBackedDecimal(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader
.readBatchOfDictionaryEncodedIntLongBackedDecimals(
.readBatchOfDictionaryEncodedIntBackedDecimals(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals(
vectorizedDefinitionLevelReader.readBatchOfIntBackedDecimals(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
Expand All @@ -305,6 +303,35 @@ public int nextBatchIntLongBackedDecimal(
return actualBatchSize;
}

public int nextBatchLongBackedDecimal(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader
.readBatchOfDictionaryEncodedLongBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfLongBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}

/**
* Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
* decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
Expand Down
Loading

0 comments on commit 1bebf1a

Please sign in to comment.