Skip to content

Commit

Permalink
Add support for TimeType / UUIDType (apache#2739)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Jun 28, 2021
1 parent bf2cbc3 commit aa65c06
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public static Field convert(final NestedField field) {
case TIME:
arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
break;
case UUID:
arrowType = new ArrowType.FixedSizeBinary(16);
break;
case TIMESTAMP:
arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND,
((Types.TimestampType) field.type()).shouldAdjustToUTC() ? "UTC" : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
* <li>Columns with constant values are physically encoded as a dictionary. The Arrow vector
* type is int32 instead of the type as per the schema.
* See https://github.com/apache/iceberg/issues/2484.</li>
* <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link Types.MapType},
* {@link Types.StructType}, {@link Types.UUIDType}, {@link Types.FixedType} and
* <li>Data types: {@link Types.ListType}, {@link Types.MapType},
* {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
* <li>Iceberg v2 spec is not supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.ValueVector;
Expand Down Expand Up @@ -112,6 +114,7 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDict
case BSON:
return new DictionaryStringAccessor<>((IntVector) vector, dictionary, stringFactorySupplier.get());
case INT_64:
case TIME_MICROS:
case TIMESTAMP_MILLIS:
case TIMESTAMP_MICROS:
return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
Expand Down Expand Up @@ -189,6 +192,10 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDict
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
return new StructAccessor<>(structVector, structChildFactorySupplier.get());
} else if (vector instanceof TimeMicroVector) {
return new TimeMicroAccessor<>((TimeMicroVector) vector);
} else if (vector instanceof FixedSizeBinaryVector) {
return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
}
Expand Down Expand Up @@ -469,6 +476,38 @@ public final long getLong(int rowId) {
}
}

private static class TimeMicroAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final TimeMicroVector vector;

TimeMicroAccessor(TimeMicroVector vector) {
super(vector);
this.vector = vector;
}

@Override
public final long getLong(int rowId) {
return vector.get(rowId);
}
}

private static class FixedSizeBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final FixedSizeBinaryVector vector;

FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
super(vector);
this.vector = vector;
}

@Override
public byte[] getBinary(int rowId) {
return vector.get(rowId);
}
}

private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.types.FloatingPointPrecision;
Expand Down Expand Up @@ -108,6 +109,8 @@ private enum ReadType {
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
TIME_MICROS,
UUID,
DICTIONARY
}

Expand Down Expand Up @@ -169,6 +172,9 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
case TIMESTAMP_MILLIS:
vectorizedColumnIterator.nextBatchTimestampMillis(vec, typeWidth, nullabilityHolder);
break;
case UUID:
vectorizedColumnIterator.nextBatchFixedSizeBinary(vec, typeWidth, nullabilityHolder);
break;
}
}
}
Expand All @@ -178,6 +184,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
nullabilityHolder, icebergField.type());
}

@SuppressWarnings("MethodLength")
private void allocateFieldVector(boolean dictionaryEncodedVector) {
if (dictionaryEncodedVector) {
Field field = new Field(
Expand Down Expand Up @@ -240,6 +247,12 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case TIME_MICROS:
this.vec = arrowField.createVector(rootAlloc);
((TimeMicroVector) vec).allocateNew(batchSize);
this.readType = ReadType.LONG;
this.typeWidth = 8;
break;
case DECIMAL:
this.vec = arrowField.createVector(rootAlloc);
((DecimalVector) vec).allocateNew(batchSize);
Expand Down Expand Up @@ -269,11 +282,17 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
} else {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
int len = ((Types.FixedType) icebergField.type()).length();
int len;
if (icebergField.type() instanceof Types.UUIDType) {
len = 16;
this.readType = ReadType.UUID;
} else {
len = ((Types.FixedType) icebergField.type()).length();
this.readType = ReadType.FIXED_WIDTH_BINARY;
}
this.vec = arrowField.createVector(rootAlloc);
vec.setInitialCapacity(batchSize * len);
vec.allocateNew();
this.readType = ReadType.FIXED_WIDTH_BINARY;
this.typeWidth = len;
break;
case BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ public void nextBatchFixedLengthDecimal(
}
}

public void nextBatchFixedSizeBinary(
FieldVector fieldVector,
int typeWidth,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchFixedSizeBinary(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}

public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -410,4 +411,42 @@ void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int star
currentCount -= num;
}
}

void readBatchOfDictionaryEncodedFixedSizeBinary(
FieldVector vector, int typeWidth, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
byte[] bytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,36 @@ public int nextBatchFixedLengthDecimal(
return actualBatchSize;
}

public int nextBatchFixedSizeBinary(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedSizeBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfFixedSizeBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}

/**
* Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
Expand Down Expand Up @@ -656,6 +657,50 @@ public void readBatchOfFixedLengthDecimals(
}
}

public void readBatchOfFixedSizeBinary(
final FieldVector vector, final int startOffset,
final int typeWidth, final int numValsToRead, NullabilityHolder nullabilityHolder,
ValuesAsBytesReader valuesReader) {
int bufferIdx = startOffset;
int left = numValsToRead;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
byte[] byteArray = new byte[typeWidth];
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
for (int i = 0; i < num; i++) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
bufferIdx++;
}
} else {
setNulls(nullabilityHolder, bufferIdx, num, vector.getValidityBuffer());
bufferIdx += num;
}
break;
case PACKED:
for (int i = 0; i < num; ++i) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
} else {
setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
bufferIdx++;
}
break;
}
left -= num;
currentCount -= num;
}
}

public void readBatchOfDictionaryEncodedFixedLengthDecimals(
final FieldVector vector,
final int startOffset,
Expand Down Expand Up @@ -701,6 +746,51 @@ public void readBatchOfDictionaryEncodedFixedLengthDecimals(
}
}

public void readBatchOfDictionaryEncodedFixedSizeBinary(
final FieldVector vector,
final int startOffset,
final int typeWidth,
final int numValsToRead,
NullabilityHolder nullabilityHolder,
VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
Dictionary dict) {
int idx = startOffset;
int left = numValsToRead;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedFixedSizeBinary(vector, typeWidth, idx,
num, dict, nullabilityHolder);
} else {
setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
}
idx += num;
break;
case PACKED:
for (int i = 0; i < num; i++) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
byte[] bytes = dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytes();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
} else {
setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}

public void readBatchVarWidth(
final FieldVector vector,
final int startOffset,
Expand Down
Loading

0 comments on commit aa65c06

Please sign in to comment.