Skip to content

Commit

Permalink
[SPARK-23864][SQL] Add unsafe object writing to UnsafeWriter
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This PR moves writing of `UnsafeRow`, `UnsafeArrayData` & `UnsafeMapData` out of the `GenerateUnsafeProjection`/`InterpretedUnsafeProjection` classes into the `UnsafeWriter` interface. This cleans up the code a little bit, and it should also result in less byte code for the code generated path.

## How was this patch tested?
Existing tests

Author: Herman van Hovell <[email protected]>

Closes apache#20986 from hvanhovell/SPARK-23864.
  • Loading branch information
hvanhovell committed Apr 10, 2018
1 parent 95034af commit 3323b15
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.catalyst.expressions.codegen;

import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
Expand Down Expand Up @@ -103,42 +106,27 @@ protected final void zeroOutPaddingBytes(int numBytes) {
public abstract void write(int ordinal, Decimal input, int precision, int scale);

public final void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes();
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);

// grow the global buffer before writing data.
grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
input.writeToMemory(getBuffer(), cursor());

setOffsetAndSize(ordinal, numBytes);

// move the cursor forward.
increaseCursor(roundedSize);
writeUnalignedBytes(ordinal, input.getBaseObject(), input.getBaseOffset(), input.numBytes());
}

public final void write(int ordinal, byte[] input) {
write(ordinal, input, 0, input.length);
}

public final void write(int ordinal, byte[] input, int offset, int numBytes) {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(input.length);
writeUnalignedBytes(ordinal, input, Platform.BYTE_ARRAY_OFFSET + offset, numBytes);
}

// grow the global buffer before writing data.
private void writeUnalignedBytes(
int ordinal,
Object baseObject,
long baseOffset,
int numBytes) {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET + offset, getBuffer(), cursor(), numBytes);

Platform.copyMemory(baseObject, baseOffset, getBuffer(), cursor(), numBytes);
setOffsetAndSize(ordinal, numBytes);

// move the cursor forward.
increaseCursor(roundedSize);
}

Expand All @@ -156,6 +144,40 @@ public final void write(int ordinal, CalendarInterval input) {
increaseCursor(16);
}

public final void write(int ordinal, UnsafeRow row) {
writeAlignedBytes(ordinal, row.getBaseObject(), row.getBaseOffset(), row.getSizeInBytes());
}

public final void write(int ordinal, UnsafeMapData map) {
writeAlignedBytes(ordinal, map.getBaseObject(), map.getBaseOffset(), map.getSizeInBytes());
}

public final void write(UnsafeArrayData array) {
// Unsafe arrays both can be written as a regular array field or as part of a map. This makes
// updating the offset and size dependent on the code path, this is why we currently do not
// provide an method for writing unsafe arrays that also updates the size and offset.
int numBytes = array.getSizeInBytes();
grow(numBytes);
Platform.copyMemory(
array.getBaseObject(),
array.getBaseOffset(),
getBuffer(),
cursor(),
numBytes);
increaseCursor(numBytes);
}

private void writeAlignedBytes(
int ordinal,
Object baseObject,
long baseOffset,
int numBytes) {
grow(numBytes);
Platform.copyMemory(baseObject, baseOffset, getBuffer(), cursor(), numBytes);
setOffsetAndSize(ordinal, numBytes);
increaseCursor(numBytes);
}

protected final void writeBoolean(long offset, boolean value) {
Platform.putBoolean(getBuffer(), offset, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,17 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
val rowWriter = new UnsafeRowWriter(writer, numFields)
val structWriter = generateStructWriter(rowWriter, fields)
(v, i) => {
val previousCursor = writer.cursor()
v.getStruct(i, fields.length) match {
case row: UnsafeRow =>
writeUnsafeData(
rowWriter,
row.getBaseObject,
row.getBaseOffset,
row.getSizeInBytes)
writer.write(i, row)
case row =>
val previousCursor = writer.cursor()
// Nested struct. We don't know where this will start because a row can be
// variable length, so we need to update the offsets and zero out the bit mask.
rowWriter.resetRowWriter()
structWriter.apply(row)
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
}
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
}

case ArrayType(elementType, containsNull) =>
Expand All @@ -214,15 +210,12 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
valueType,
valueContainsNull)
(v, i) => {
val previousCursor = writer.cursor()
v.getMap(i) match {
case map: UnsafeMapData =>
writeUnsafeData(
valueArrayWriter,
map.getBaseObject,
map.getBaseOffset,
map.getSizeInBytes)
writer.write(i, map)
case map =>
val previousCursor = writer.cursor()

// preserve 8 bytes to write the key array numBytes later.
valueArrayWriter.grow(8)
valueArrayWriter.increaseCursor(8)
Expand All @@ -237,8 +230,8 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {

// Write the values.
writeArray(valueArrayWriter, valueWriter, map.valueArray())
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
}
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
}

case udt: UserDefinedType[_] =>
Expand Down Expand Up @@ -318,11 +311,7 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
elementWriter: (SpecializedGetters, Int) => Unit,
array: ArrayData): Unit = array match {
case unsafe: UnsafeArrayData =>
writeUnsafeData(
arrayWriter,
unsafe.getBaseObject,
unsafe.getBaseOffset,
unsafe.getSizeInBytes)
arrayWriter.write(unsafe)
case _ =>
val numElements = array.numElements()
arrayWriter.initialize(numElements)
Expand All @@ -332,23 +321,4 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
i += 1
}
}

/**
* Write an opaque block of data to the buffer. This is used to copy
* [[UnsafeRow]], [[UnsafeArrayData]] and [[UnsafeMapData]] objects.
*/
private def writeUnsafeData(
writer: UnsafeWriter,
baseObject: AnyRef,
baseOffset: Long,
sizeInBytes: Int) : Unit = {
writer.grow(sizeInBytes)
Platform.copyMemory(
baseObject,
baseOffset,
writer.getBuffer,
writer.cursor,
sizeInBytes)
writer.increaseCursor(sizeInBytes)
}
}
Loading

0 comments on commit 3323b15

Please sign in to comment.