Skip to content

Commit

Permalink
[SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD
Browse files Browse the repository at this point in the history
This discourages users from calling the VertexRDD and EdgeRDD constructor and makes it easier for future changes to ensure backward compatibility.

Author: Ankur Dave <[email protected]>

Closes apache#2530 from ankurdave/SPARK-3666 and squashes the following commits:

d681f45 [Ankur Dave] Define getPartitions and compute in abstract class for MIMA
1472390 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666
24201d4 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666
cbe15f2 [Ankur Dave] Remove specialized annotation from VertexRDD and EdgeRDD
931b587 [Ankur Dave] Use abstract class instead of trait for binary compatibility
9ba4ec4 [Ankur Dave] Mark (Vertex|Edge)RDDImpl constructors package-private
620e603 [Ankur Dave] Extract VertexRDD interface and move implementation to VertexRDDImpl
55b6398 [Ankur Dave] Extract EdgeRDD interface and move implementation to EdgeRDDImpl
  • Loading branch information
ankurdave authored and rxin committed Nov 12, 2014
1 parent c3afd32 commit a5ef581
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 244 deletions.
111 changes: 22 additions & 89 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,32 @@

package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.Dependency
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.EdgeRDDImpl

/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
* partition for performance. It may additionally store the vertex attributes associated with each
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

override def setName(_name: String): this.type = {
if (partitionsRDD.name != null) {
partitionsRDD.setName(partitionsRDD.name + ", " + _name)
} else {
partitionsRDD.setName(_name)
}
this
}
setName("EdgeRDD")
abstract class EdgeRDD[ED, VD](
@transient sc: SparkContext,
@transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]

/**
* If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the
* [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
Expand All @@ -66,68 +53,26 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
}

override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}

/** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
}

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
* @tparam ED2 the new edge value type
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD]

/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -140,22 +85,14 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
/** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
}
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]

/**
* Changes the target storage level while preserving all other properties of the
Expand All @@ -164,11 +101,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
}

private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
}

object EdgeRDD {
Expand Down Expand Up @@ -197,6 +130,6 @@ object EdgeRDD {
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
new EdgeRDDImpl(edgePartitions)
}
}
Loading

0 comments on commit a5ef581

Please sign in to comment.