Skip to content

Commit

Permalink
Less dependency on jcuda except org.apache.spark.cuda
Browse files Browse the repository at this point in the history
Introduce org.apache.spar.unsafe.memory.Pointer
  • Loading branch information
kiszk committed Jan 21, 2016
1 parent b1c7cad commit 82f8aff
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 134 deletions.
18 changes: 7 additions & 11 deletions core/src/main/scala/org/apache/spark/ColumnPartitionData.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import jcuda.driver.CUmodule
import jcuda.runtime.{cudaStream_t, cudaMemcpyKind, JCuda}
import org.apache.spark.storage.{RDDBlockId, BlockId}

import math._
Expand All @@ -31,11 +29,9 @@ import java.nio.{ByteBuffer, ByteOrder}
import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.IteratorFunctions._
import org.apache.spark.util.Utils

import jcuda.Pointer
import org.apache.spark.unsafe.memory.Pointer

import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -61,6 +57,7 @@ class ColumnPartitionData[T](
private var refCounter = 1

// TODO blockId can never be NULL, modify the testcase to pass valid blockId and remove (0,0).
val cudaManager = SparkEnv.get.cudaManager
var blockId : Option[BlockId] = Some(RDDBlockId(0, 0))
def rddId : Int = blockId.getOrElse(RDDBlockId(0, 0)).asRDDId.get.rddId
def cachedGPUPointers : HashMap[String, Pointer] =
Expand Down Expand Up @@ -194,7 +191,7 @@ class ColumnPartitionData[T](
order.map(columnsByAccessors(_))
}

private[spark] def orderedGPUPointers(order: Seq[String], stream : cudaStream_t):
private[spark] def orderedGPUPointers(order: Seq[String], devIx: Int):
Vector[Pointer] = {
var gpuPtrs = Vector[Pointer]()
var gpuBlobs = Vector[Pointer]()
Expand All @@ -204,7 +201,7 @@ class ColumnPartitionData[T](
val inPointers = orderedPointers(order)
for ((col, name, cpuPtr) <- (inColumns, order, inPointers).zipped) {
gpuPtrs = gpuPtrs :+ cachedGPUPointers.getOrElseUpdate(blockId.get + name, {
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(col.memoryUsage(size))
val gpuPtr = cudaManager.allocGPUMemory(col.memoryUsage(size))
memCpys = memCpys :+ (gpuPtr, cpuPtr, col.memoryUsage(size))
gpuPtr
})
Expand All @@ -215,7 +212,7 @@ class ColumnPartitionData[T](
for ((blob, name, cpuPtr) <-
(inBlobBuffers, (1 to inBlobBuffers.length).map(_.toString), inBlobs).zipped) {
gpuBlobs = gpuBlobs :+ cachedGPUPointers.getOrElseUpdate(blockId.get + name, {
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(blob.capacity())
val gpuPtr = cudaManager.allocGPUMemory(blob.capacity())
memCpys = memCpys :+ (gpuPtr, cpuPtr, blob.capacity().toLong)
gpuPtr
})
Expand All @@ -233,8 +230,7 @@ class ColumnPartitionData[T](
*/

for ((gpuPtr, cpuPtr, length) <- memCpys) {
JCuda.cudaMemcpyAsync(gpuPtr, cpuPtr, length,
cudaMemcpyKind.cudaMemcpyHostToDevice, stream)
cudaManager.memcpyH2DASync(gpuPtr, cpuPtr, length, devIx)
}

gpuPtrs ++ gpuBlobs
Expand All @@ -244,7 +240,7 @@ class ColumnPartitionData[T](
if (!gpuCache) {
for ((name, ptr) <- cachedGPUPointers) {
if (name.startsWith(blockId.get.toString)) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
cachedGPUPointers.remove(name)
}
}
Expand Down
Empty file.
80 changes: 31 additions & 49 deletions core/src/main/scala/org/apache/spark/cuda/CUDAFunction.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/


package org.apache.spark.cuda

import org.apache.spark.storage.BlockId
Expand All @@ -25,20 +26,14 @@ import java.net.URL
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}

import jcuda.Pointer
import jcuda.driver.CUfunction
import jcuda.driver.CUmodule
import jcuda.driver.CUstream
import jcuda.driver.JCudaDriver
import jcuda.runtime.cudaStream_t
import jcuda.runtime.cudaMemcpyKind
import jcuda.runtime.JCuda

import org.apache.commons.io.IOUtils
import org.apache.spark.rdd.ExternalFunction
import org.apache.spark.{PartitionData, ColumnPartitionData, ColumnPartitionSchema, SparkEnv,
SparkException}
import org.apache.spark.util.Utils
import org.apache.spark.unsafe.memory.Pointer

/**
* A CUDA kernel wrapper. Contains CUDA module, information how to extract CUDA kernel from it and
Expand Down Expand Up @@ -86,28 +81,20 @@ class CUDAFunction(
outputArraySizes: Seq[Long] = null,
inputFreeVariables: Seq[Any] = null,
blockId : Option[BlockId] = None): ColumnPartitionData[U] = {
val cudaManager = SparkEnv.get.cudaManager
val outputSchema = ColumnPartitionSchema.schemaFor[U]

// TODO add array size
val memoryUsage = (if (in.gpuCached) 0 else in.memoryUsage) + outputSchema.memoryUsage(in.size)

val streamDevIx = SparkEnv.get.cudaManager.getStream(memoryUsage, in.gpuDevIx)
val stream = streamDevIx._1
val devIx = cudaManager.getDevice(memoryUsage, in.gpuDevIx)
if (in.gpuCache) {
in.gpuDevIx = streamDevIx._2
in.gpuDevIx = devIx
}

// TODO cache the function if there is a chance that after a deserialization kernel gets called
// multiple times - but only if no synchronization is needed for that
val module = resource match {
case url: URL =>
SparkEnv.get.cudaManager.cachedLoadModule(Left(url))
case (name: String, ptx: String) =>
SparkEnv.get.cudaManager.cachedLoadModule(Right(name, ptx))
case _ => throw new SparkException("Unsupported resource type for CUDAFunction")
}
val function = new CUfunction
JCudaDriver.cuModuleGetFunction(function, module, kernelSignature)
val function = cudaManager.moduleGetFunction(resource, kernelSignature)

val actualOutputSize = outputSize.getOrElse(in.size)
val out = if (outputArraySizes == null) {
Expand All @@ -127,8 +114,8 @@ class CUDAFunction(
val outColumns = out.schema.orderedColumns(outputColumnsOrder)
for (col <- outColumns) {
val size = col.memoryUsage(out.size)
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(size)
JCuda.cudaMemsetAsync(gpuPtr, 0, size, stream)
val gpuPtr = cudaManager.allocGPUMemory(size)
cudaManager.memsetASync(gpuPtr, 0, size, devIx)
gpuOutputPtrs = gpuOutputPtrs :+ gpuPtr
}

Expand All @@ -144,31 +131,31 @@ class CUDAFunction(
case v: Array[Byte] =>
val len = v.length
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Char] =>
val len = v.length * 2
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Short] =>
val len = v.length * 2
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Int] =>
val len = v.length * 4
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Long] =>
val len = v.length * 8
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Float] =>
val len = v.length * 4
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Double] =>
val len = v.length * 8
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case _ => throw new SparkException("Unsupported type passed to kernel "
+ "as a free variable argument")
}
Expand All @@ -180,17 +167,16 @@ class CUDAFunction(
if (out.blobBuffers != null) {out.blobBuffers} else {Array[ByteBuffer]()}
for (blob <- outBlobBuffers) {
val size = blob.capacity()
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(size)
JCuda.cudaMemsetAsync(gpuPtr, 0, size, stream)
val gpuPtr = cudaManager.allocGPUMemory(size)
cudaManager.memsetASync(gpuPtr, 0, size, devIx)
gpuOutputBlobs = gpuOutputBlobs :+ gpuPtr
}

// perform allocGPUMemory and cudaMemcpyAsync
val gpuInputPtrs = in.orderedGPUPointers(inputColumnsOrder, stream)
val gpuInputPtrs = in.orderedGPUPointers(inputColumnsOrder, devIx)

for (((cpuPtr, size), gpuPtr) <- (cpuInputFreeVars zip inputFreeVarPtrs)) {
JCuda.cudaMemcpyAsync(gpuPtr, cpuPtr, size,
cudaMemcpyKind.cudaMemcpyHostToDevice, stream)
cudaManager.memcpyH2DASync(gpuPtr, cpuPtr, size, devIx)
}

val gpuPtrParams = (gpuInputPtrs ++
Expand All @@ -208,8 +194,6 @@ class CUDAFunction(
+ "argument")
}

val wrappedStream = new CUstream(stream)

stagesCount match {
// normal launch, no stages, suitable for map
case None =>
Expand All @@ -220,16 +204,16 @@ class CUDAFunction(

val (gpuGridSize, gpuBlockSize) = dimensions match {
case Some(computeDim) => computeDim(in.size, 1)
case None => SparkEnv.get.cudaManager.computeDimensions(in.size)
case None => cudaManager.computeDimensions(in.size)
}

JCudaDriver.cuLaunchKernel(
cudaManager.launchKernel(
function,
gpuGridSize, 1, 1,
gpuBlockSize, 1, 1,
0,
wrappedStream,
kernelParameters, null)
devIx,
kernelParameters)

// launch kernel multiple times (multiple stages), suitable for reduce
case Some(totalStagesFun) =>
Expand All @@ -253,39 +237,37 @@ class CUDAFunction(
throw new SparkException("Dimensions must be provided for multi-stage kernels")
}

JCudaDriver.cuLaunchKernel(
cudaManager.launchKernel(
function,
gpuGridSize, 1, 1,
gpuBlockSize, 1, 1,
0,
wrappedStream,
kernelParameters, null)
devIx,
kernelParameters)
}
}

val outPointers = out.orderedPointers(outputColumnsOrder)
for ((cpuPtr, gpuPtr, col) <- (outPointers, gpuOutputPtrs, outColumns).zipped) {
JCuda.cudaMemcpyAsync(cpuPtr, gpuPtr, col.memoryUsage(out.size),
cudaMemcpyKind.cudaMemcpyDeviceToHost, stream)
cudaManager.memcpyD2HASync(cpuPtr, gpuPtr, col.memoryUsage(out.size), devIx)
}

for ((cpuPtr, gpuPtr, blob) <- (outBlobs, gpuOutputBlobs, outBlobBuffers).zipped) {
JCuda.cudaMemcpyAsync(cpuPtr, gpuPtr, blob.capacity(),
cudaMemcpyKind.cudaMemcpyDeviceToHost, stream)
cudaManager.memcpyD2HASync(cpuPtr, gpuPtr, blob.capacity(), devIx)
}

if (!in.gpuCache || ((gpuOutputPtrs.size + gpuOutputBlobs.size) > 0)) {
JCuda.cudaStreamSynchronize(stream)
cudaManager.streamSynchronize(devIx)
}
out.blockId = blockId
out
} {
in.freeGPUPointers()
for (ptr <- gpuOutputPtrs) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
}
for (ptr <- gpuOutputBlobs) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
}
}
} catch {
Expand Down
Loading

0 comments on commit 82f8aff

Please sign in to comment.