Skip to content

Commit

Permalink
Merge branch 'master' into streaming
Browse files Browse the repository at this point in the history
Conflicts:
	.gitignore
  • Loading branch information
tdas committed Jun 25, 2013
2 parents 48c7e37 + 78ffe16 commit c89af0a
Show file tree
Hide file tree
Showing 289 changed files with 13,250 additions and 3,293 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ dependency-reduced-pom.xml
.ensime
.ensime_lucene
checkpoint
derby.log
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ This README file only contains basic setup instructions.

## Building

Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT),
which is packaged with it. To build Spark and its example programs, run:
Spark requires Scala 2.9.2 (Scala 2.10 is not yet supported). The project is
built using Simple Build Tool (SBT), which is packaged with it. To build
Spark and its example programs, run:

sbt/sbt package

Spark also supports building using Maven. If you would like to build using Maven,
see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
in the spark documentation..

To run Spark, you will need to have Scala's bin directory in your `PATH`, or
you will need to set the `SCALA_HOME` environment variable to point to where
you've installed Scala. Scala must be accessible through one of these
Expand Down
41 changes: 39 additions & 2 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
<artifactId>parent</artifactId>
<version>0.7.0-SNAPSHOT</version>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -102,5 +102,42 @@
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
97 changes: 89 additions & 8 deletions bagel/src/main/scala/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,37 @@ import spark._
import spark.SparkContext._

import scala.collection.mutable.ArrayBuffer
import storage.StorageLevel

object Bagel extends Logging {
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK

/**
* Runs a Bagel program.
* @param sc [[spark.SparkContext]] to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
* the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
* empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
* message before sending (which often involves network I/O).
* @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
* and provides the result to each vertex in the next superstep.
* @param partitioner [[spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
* Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
* optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
* @tparam M message type
* @tparam C combiner
* @tparam A aggregator
* @return an RDD of (K, V) pairs representing the graph after completion of the program
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C: Manifest, A: Manifest](
sc: SparkContext,
Expand All @@ -14,7 +43,8 @@ object Bagel extends Logging {
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
numPartitions: Int
numPartitions: Int,
storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)] = {
Expand All @@ -32,8 +62,9 @@ object Bagel extends Logging {
val combinedMsgs = msgs.combineByKey(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)

val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
Expand All @@ -50,6 +81,7 @@ object Bagel extends Logging {
verts
}

/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -59,12 +91,29 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
run[K, V, M, C, Nothing](
sc, vertices, messages, combiner, None, partitioner, numPartitions)(
sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}

/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
* and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -73,24 +122,54 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, C, Nothing](
sc, vertices, messages, combiner, None, part, numPartitions)(
sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}

/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
* [[spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)] = {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
* and [[spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, Array[M], Nothing](
sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, Array[M]](compute))
}

Expand All @@ -117,7 +196,8 @@ object Bagel extends Logging {
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
compute: (V, Option[C]) => (V, Array[M])
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
Expand All @@ -135,7 +215,7 @@ object Bagel extends Logging {
numActiveVerts += 1

Some((newVert, newMsgs))
}.cache
}.persist(storageLevel)

// Force evaluation of processed RDD for accurate performance measurements
processed.foreach(x => {})
Expand Down Expand Up @@ -166,6 +246,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}

/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)
Expand Down
19 changes: 19 additions & 0 deletions bagel/src/test/scala/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer

import spark._
import storage.StorageLevel

class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
Expand All @@ -22,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
}

test("halting by voting") {
Expand Down Expand Up @@ -79,4 +81,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
}
}

test("using non-default persistence level") {
failAfter(10 seconds) {
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 50
val result =
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
}
}
}
}
10 changes: 6 additions & 4 deletions bin/spark-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##

usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>"
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"

# if no args specified, show usage
if [ $# -le 1 ]; then
Expand All @@ -48,6 +48,8 @@ startStop=$1
shift
command=$1
shift
instance=$1
shift

spark_rotate_log ()
{
Expand Down Expand Up @@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then
fi

# some variables
export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log
export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log
export SPARK_ROOT_LOGGER="INFO,DRFA"
log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out
pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid
log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out
pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid

# Set default scheduling priority
if [ "$SPARK_NICENESS" = "" ]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-daemons.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Run a Spark command on all slave hosts.

usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."

# if no args specified, show usage
if [ $# -le 1 ]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/start-master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi

"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
5 changes: 3 additions & 2 deletions bin/start-slave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ bin=`cd "$bin"; pwd`
# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
# NOTE: ec2-metadata is installed on Amazon Linux AMI. Check based on that and hostname
if command -v ec2-metadata > /dev/null || [[ `hostname` == *ec2.internal ]]; then
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi

"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@"
11 changes: 10 additions & 1 deletion bin/start-slaves.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,13 @@ fi
echo "Master IP: $SPARK_MASTER_IP"

# Launch the slaves
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
else
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
SPARK_WORKER_WEBUI_PORT=8081
fi
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
"$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
done
fi
2 changes: 1 addition & 1 deletion bin/stop-master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd`

. "$bin/spark-config.sh"

"$bin"/spark-daemon.sh stop spark.deploy.master.Master
"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1
12 changes: 11 additions & 1 deletion bin/stop-slaves.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd`

. "$bin/spark-config.sh"

"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
. "${SPARK_CONF_DIR}/spark-env.sh"
fi

if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1
else
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 ))
done
fi
Loading

0 comments on commit c89af0a

Please sign in to comment.