Skip to content

Commit

Permalink
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExt…
Browse files Browse the repository at this point in the history
…ernalSorter

This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in apache#7762: store large records in their own overflow pages.

Author: Josh Rosen <[email protected]>

Closes apache#7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:

967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
  • Loading branch information
JoshRosen authored and rxin committed Aug 4, 2015
1 parent f4b1ac0 commit ab8ee1a
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
Expand Down Expand Up @@ -143,8 +144,7 @@ private UnsafeExternalSorter(
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
deleteSpillFiles();
freeMemory();
cleanupResources();
return null;
}
});
Expand Down Expand Up @@ -249,7 +249,7 @@ public int getNumberOfAllocatedPages() {
*
* @return the number of bytes freed.
*/
public long freeMemory() {
private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
Expand All @@ -275,44 +275,32 @@ public long freeMemory() {
/**
* Deletes any spill files created by this sorter.
*/
public void deleteSpillFiles() {
private void deleteSpillFiles() {
for (UnsafeSorterSpillWriter spill : spillWriters) {
File file = spill.getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
};
}
}
}
}

/**
* Checks whether there is enough space to insert a new record into the sorter.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size.
* @return true if the record can be inserted without requiring more allocations, false otherwise.
* Frees this sorter's in-memory data structures and cleans up its spill files.
*/
private boolean haveSpaceForRecord(int requiredSpace) {
assert(requiredSpace > 0);
assert(inMemSorter != null);
return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
public void cleanupResources() {
deleteSpillFiles();
freeMemory();
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size.
* Checks whether there is enough space to insert an additional record in to the sort pointer
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
// TODO: merge these steps to first calculate total memory requirements for this insert,
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
// data page.
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
Expand All @@ -326,7 +314,20 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
* that exceed the page size are handled via a different code path which uses
* special overflow pages).
*/
private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
assert (requiredSpace <= pageSizeBytes);
if (requiredSpace > freeSpaceInCurrentPage) {
logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
freeSpaceInCurrentPage);
Expand All @@ -339,9 +340,7 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
if (memoryAcquired > 0) {
shuffleMemoryManager.release(memoryAcquired);
}
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
Expand All @@ -365,26 +364,59 @@ public void insertRecord(
long recordBaseOffset,
int lengthInBytes,
long prefix) throws IOException {

growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);

// --- Figure out where to insert the new record ----------------------------------------------

final MemoryBlock dataPage;
long dataPagePosition;
boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
if (useOverflowPage) {
long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
// The record is larger than the page size, so allocate a special overflow page just to hold
// that record.
final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGranted != overflowPageSize) {
shuffleMemoryManager.release(memoryGranted);
spill();
final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGrantedAfterSpill != overflowPageSize) {
shuffleMemoryManager.release(memoryGrantedAfterSpill);
throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
}
}
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
allocatedPages.add(overflowPage);
dataPage = overflowPage;
dataPagePosition = overflowPage.getBaseOffset();
} else {
// The record is small enough to fit in a regular data page, but the current page might not
// have enough space to hold it (or no pages have been allocated yet).
acquireNewPageIfNecessary(totalSpaceRequired);
dataPage = currentPage;
dataPagePosition = currentPagePosition;
// Update bookkeeping information
freeSpaceInCurrentPage -= totalSpaceRequired;
currentPagePosition += totalSpaceRequired;
}
assert(inMemSorter != null);
final Object dataPageBaseObject = dataPage.getBaseObject();

// --- Insert the record ----------------------------------------------------------------------

final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
currentPagePosition += 4;
taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
dataPagePosition += 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
currentPagePosition,
dataPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
freeSpaceInCurrentPage -= totalSpaceRequired;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}

Expand All @@ -399,33 +431,70 @@ public void insertRecord(
public void insertKVRecord(
Object keyBaseObj, long keyOffset, int keyLen,
Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {

growPointerArrayIfNecessary();
final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);

// --- Figure out where to insert the new record ----------------------------------------------

final MemoryBlock dataPage;
long dataPagePosition;
boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
if (useOverflowPage) {
long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
// The record is larger than the page size, so allocate a special overflow page just to hold
// that record.
final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGranted != overflowPageSize) {
shuffleMemoryManager.release(memoryGranted);
spill();
final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGrantedAfterSpill != overflowPageSize) {
shuffleMemoryManager.release(memoryGrantedAfterSpill);
throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
}
}
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
allocatedPages.add(overflowPage);
dataPage = overflowPage;
dataPagePosition = overflowPage.getBaseOffset();
} else {
// The record is small enough to fit in a regular data page, but the current page might not
// have enough space to hold it (or no pages have been allocated yet).
acquireNewPageIfNecessary(totalSpaceRequired);
dataPage = currentPage;
dataPagePosition = currentPagePosition;
// Update bookkeeping information
freeSpaceInCurrentPage -= totalSpaceRequired;
currentPagePosition += totalSpaceRequired;
}
assert(inMemSorter != null);
final Object dataPageBaseObject = dataPage.getBaseObject();

// --- Insert the record ----------------------------------------------------------------------

final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4);
currentPagePosition += 4;
taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4);
dataPagePosition += 4;

PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen);
currentPagePosition += 4;
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen);
dataPagePosition += 4;

PlatformDependent.copyMemory(
keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen);
currentPagePosition += keyLen;
keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
dataPagePosition += keyLen;

PlatformDependent.copyMemory(
valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen);
currentPagePosition += valueLen;
valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen);

freeSpaceInCurrentPage -= totalSpaceRequired;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}

/**
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class PackedRecordPointerSuite {
public void heap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
final MemoryBlock page0 = memoryManager.allocatePage(100);
final MemoryBlock page1 = memoryManager.allocatePage(100);
final MemoryBlock page0 = memoryManager.allocatePage(128);
final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
Expand All @@ -50,8 +50,8 @@ public void heap() {
public void offHeap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
final MemoryBlock page0 = memoryManager.allocatePage(100);
final MemoryBlock page1 = memoryManager.allocatePage(100);
final MemoryBlock page0 = memoryManager.allocatePage(128);
final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
Expand Down
Loading

0 comments on commit ab8ee1a

Please sign in to comment.