Skip to content

Commit

Permalink
[SPARK-22143][SQL] Fix memory leak in OffHeapColumnVector
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).

## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.

Author: Herman van Hovell <[email protected]>

Closes apache#19367 from hvanhovell/SPARK-22143.
  • Loading branch information
hvanhovell committed Sep 27, 2017
1 parent 9b98aef commit 02bb068
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public long nullsNativeAddress() {

@Override
public void close() {
super.close();
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public long nullsNativeAddress() {

@Override
public void close() {
super.close();
nulls = null;
byteData = null;
shortData = null;
intData = null;
longData = null;
floatData = null;
doubleData = null;
arrayLengths = null;
arrayOffsets = null;
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ public void reset() {
}
}

@Override
public void close() {
if (childColumns != null) {
for (int i = 0; i < childColumns.length; i++) {
childColumns[i].close();
childColumns[i] = null;
}
childColumns = null;
}
if (dictionaryIds != null) {
dictionaryIds.close();
dictionaryIds = null;
}
dictionary = null;
resultStruct = null;
resultArray = null;
}

public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) {
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {

var testVector: WritableColumnVector = _

private def allocate(capacity: Int, dt: DataType): WritableColumnVector = {
new OnHeapColumnVector(capacity, dt)
private def withVector(
vector: WritableColumnVector)(
block: WritableColumnVector => Unit): Unit = {
try block(vector) finally vector.close()
}

override def afterEach(): Unit = {
testVector.close()
private def testVectors(
name: String,
size: Int,
dt: DataType)(
block: WritableColumnVector => Unit): Unit = {
test(name) {
withVector(new OnHeapColumnVector(size, dt))(block)
withVector(new OffHeapColumnVector(size, dt))(block)
}
}

test("boolean") {
testVector = allocate(10, BooleanType)
testVectors("boolean", 10, BooleanType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendBoolean(i % 2 == 0)
}
Expand All @@ -49,34 +54,31 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("byte") {
testVector = allocate(10, ByteType)
testVectors("byte", 10, ByteType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendByte(i.toByte)
}

val array = new ColumnVector.Array(testVector)

(0 until 10).foreach { i =>
assert(array.get(i, ByteType) === (i.toByte))
assert(array.get(i, ByteType) === i.toByte)
}
}

test("short") {
testVector = allocate(10, ShortType)
testVectors("short", 10, ShortType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendShort(i.toShort)
}

val array = new ColumnVector.Array(testVector)

(0 until 10).foreach { i =>
assert(array.get(i, ShortType) === (i.toShort))
assert(array.get(i, ShortType) === i.toShort)
}
}

test("int") {
testVector = allocate(10, IntegerType)
testVectors("int", 10, IntegerType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendInt(i)
}
Expand All @@ -88,8 +90,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("long") {
testVector = allocate(10, LongType)
testVectors("long", 10, LongType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendLong(i)
}
Expand All @@ -101,8 +102,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("float") {
testVector = allocate(10, FloatType)
testVectors("float", 10, FloatType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendFloat(i.toFloat)
}
Expand All @@ -114,8 +114,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("double") {
testVector = allocate(10, DoubleType)
testVectors("double", 10, DoubleType) { testVector =>
(0 until 10).foreach { i =>
testVector.appendDouble(i.toDouble)
}
Expand All @@ -127,8 +126,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("string") {
testVector = allocate(10, StringType)
testVectors("string", 10, StringType) { testVector =>
(0 until 10).map { i =>
val utf8 = s"str$i".getBytes("utf8")
testVector.appendByteArray(utf8, 0, utf8.length)
Expand All @@ -141,8 +139,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("binary") {
testVector = allocate(10, BinaryType)
testVectors("binary", 10, BinaryType) { testVector =>
(0 until 10).map { i =>
val utf8 = s"str$i".getBytes("utf8")
testVector.appendByteArray(utf8, 0, utf8.length)
Expand All @@ -156,9 +153,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

test("array") {
val arrayType = ArrayType(IntegerType, true)
testVector = allocate(10, arrayType)
val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
testVectors("array", 10, arrayType) { testVector =>

val data = testVector.arrayData()
var i = 0
Expand All @@ -181,9 +177,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(array.get(3, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(3, 4, 5))
}

test("struct") {
val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
testVector = allocate(10, schema)
val structType: StructType = new StructType().add("int", IntegerType).add("double", DoubleType)
testVectors("struct", 10, structType) { testVector =>
val c1 = testVector.getChildColumn(0)
val c2 = testVector.getChildColumn(1)
c1.putInt(0, 123)
Expand All @@ -193,35 +188,34 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {

val array = new ColumnVector.Array(testVector)

assert(array.get(0, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 123)
assert(array.get(0, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 3.45)
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
assert(array.get(0, structType).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 123)
assert(array.get(0, structType).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 3.45)
assert(array.get(1, structType).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
assert(array.get(1, structType).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
}

test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
val arrayType = ArrayType(IntegerType, true)
testVector = new OffHeapColumnVector(8, arrayType)
withVector(new OffHeapColumnVector(8, arrayType)) { testVector =>
val data = testVector.arrayData()
(0 until 8).foreach(i => data.putInt(i, i))
(0 until 8).foreach(i => testVector.putArray(i, i, 1))

val data = testVector.arrayData()
(0 until 8).foreach(i => data.putInt(i, i))
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
// Increase vector's capacity and reallocate the data to new bigger buffers.
testVector.reserve(16)

// Increase vector's capacity and reallocate the data to new bigger buffers.
testVector.reserve(16)

// Check that none of the values got lost/overwritten.
val array = new ColumnVector.Array(testVector)
(0 until 8).foreach { i =>
assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
// Check that none of the values got lost/overwritten.
val array = new ColumnVector.Array(testVector)
(0 until 8).foreach { i =>
assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
}
}
}

test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
testVector = new OffHeapColumnVector(8, structType)
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
testVector.reserve(16)
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
withVector(new OffHeapColumnVector(8, structType)) { testVector =>
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
testVector.reserve(16)
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
}
}
}
Loading

0 comments on commit 02bb068

Please sign in to comment.