Distributed Neural Networks for Spark. Details are available in the paper.
To run SparkNet, you will need a Spark cluster. SparkNet apps can be run using spark-submit.
Start a Spark cluster using our AMI
Create an AWS secret key and access key. Instructions here.
andexport AWS_ACCESS_KEY_ID=
with the relevant values. -
Clone our repository locally.
Start a 5-worker Spark cluster on EC2 by running
SparkNet/ec2/spark-ec2 --key-pair=key --identity-file=key.rsa --region=eu-west-1 --zone=eu-west-1c --instance-type=g2.8xlarge --ami=ami-c0dd7db3 -s 5 --copy-aws-credentials --spark-version 1.5.0 --spot-price 1.5 --no-ganglia --user-data SparkNet/ec2/cloud-config.txt launch sparknet
assuming key.rsa
is your key pair.
Train Cifar using SparkNet
SSH to the Spark master as
. -
to get the Cifar data -
Train Cifar on 5 workers using
/root/spark/bin/spark-submit --class apps.CifarApp /root/SparkNet/target/scala-2.10/sparknet-assembly-0.1-SNAPSHOT.jar 5
That's all! Information is logged on the master in
For now, you have to install the following. We have an AMI with these dependencies already installed (ami-c0dd7db3). Dependencies:
- sbt 0.13 - installation instructions
- cuda 7.0 - installation instructions
- lmdb -
apt-get install liblmdb-dev
(optional, only if you want to use LMDB) - leveldb -
apt-get install libleveldb-dev
(optional, only if you want to use LevelDB)
On EC2:
- For each worker node, create one volume (e.g., 100GB) and attach it to the worker (e.g., for instance, at
On the master:
Clone the SparkNet repository.
Set the
environment variable to the SparkNet directory. -
Build Caffe by running the following:
cd $SPARKNET_HOME mkdir build cd build cmake ../libccaffe make -j 30
Increase the Java heap space with
export _JAVA_OPTIONS="-Xmx8g"
. -
mkdir /tmp/spark-events
(Spark does some logging there). -
Build SparkNet by doing:
cd $SPARKNET_HOME sbt assembly
On each worker:
- Clone the SparkNet repository.
- Set the
environment variable to the SparkNet directory. - Build Caffe as on the master.
- Run
mount /dev/xvdf /mnt2/spark
to mount the volume you created earlier (assuming you attached the volume at/dev/sdf
). Spark will spill data to disk here. If everything fits in memory, then this may not be necessary.
To run CifarApp, do the following:
First get the Cifar data with
Set the correct value of
. -
Then submit the job with
$SPARK_HOME/bin/spark-submit --class apps.CifarApp SparkNetPreview/target/scala-2.10/sparknetpreview-assembly-0.1-SNAPSHOT.jar 5
To run ImageNet, do the following:
Obtain the ImageNet data by following the instructions here. This involves creating an account and submitting a request.
Put the training tar files on S3 at
Tar the validation files by running
and put them on S3 at s3://sparknet/ILSVRC2012_val
4. Set the correct value of sparkNetHome
in src/main/scala/apps/ImageNetApp.scala
5. Submit a job on the master with
spark-submit --class apps.ImageNetApp $SPARKNET_HOME/target/scala-2.10/sparknet-assembly-0.1-SNAPSHOT.jar n
where n
is the number of worker nodes in your Spark cluster.
SparkNet is a deep learning library for Spark. Here we describe a bit of the design.
We use Java Native Access to call C code from Java.
Since Caffe is written in C++, we first create a C wrapper for Caffe in libccaffe/ccaffe.cpp
and libccaffe/ccaffe.h
We then create a Java interface to the C wrapper in src/main/java/libs/CaffeLibrary.java
This library could be called directly, but the easiest way to use it is through the CaffeNet
class in src/main/scala/libs/Net.scala
To enable Caffe to read data from Spark RDDs, we define a JavaDataLayer
in caffe/include/caffe/data_layers.hpp
and caffe/src/caffe/layers/java_data_layer.cpp
A model is specified in a NetParameter
object, and a solver is specified in a SolverParameter
These can be specified directly in Scala, for example:
val netParam = NetParam ("LeNet",
RDDLayer("data", shape=List(batchsize, 1, 28, 28), None),
RDDLayer("label", shape=List(batchsize, 1), None),
ConvolutionLayer("conv1", List("data"), kernel=(5,5), numOutput=20),
PoolingLayer("pool1", List("conv1"), pooling=Pooling.Max, kernel=(2,2), stride=(2,2)),
ConvolutionLayer("conv2", List("pool1"), kernel=(5,5), numOutput=50),
PoolingLayer("pool2", List("conv2"), pooling=Pooling.Max, kernel=(2,2), stride=(2,2)),
InnerProductLayer("ip1", List("pool2"), numOutput=500),
ReLULayer("relu1", List("ip1")),
InnerProductLayer("ip2", List("relu1"), numOutput=10),
SoftmaxWithLoss("loss", List("ip2", "label"))
Conveniently, they can be loaded from Caffe prototxt files:
val sparkNetHome = sys.env("SPARKNET_HOME")
var netParameter = ProtoLoader.loadNetPrototxt(sparkNetHome + "/caffe/models/bvlc_reference_caffenet/train_val.prototxt")
netParameter = ProtoLoader.replaceDataLayers(netParameter, trainBatchSize, testBatchSize, channels, croppedHeight, croppedWidth)
val solverParameter = ProtoLoader.loadSolverPrototxtWithNet(sparkNetHome + "/caffe/models/bvlc_reference_caffenet/solver.prototxt", netParameter, None)
The third line modifies the NetParameter
object to read data from a JavaDataLayer
A CaffeNet
object can then be created from a SolverParameter
val net = CaffeNet(solverParameter)