Skip to content

Commit

Permalink
[SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel
Browse files Browse the repository at this point in the history
Author: Takeshi YAMAMURO <[email protected]>

Closes apache#10918 from maropu/RemoveDeprecateInPregel.
  • Loading branch information
maropu authored and srowen committed Feb 15, 2016
1 parent a8bbc4f commit 56d4939
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 134 deletions.
49 changes: 0 additions & 49 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,55 +340,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex.
*
* This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
*
* @tparam A the type of "message" to be sent to each vertex
*
* @param mapFunc the user defined map function which returns 0 or
* more messages to neighboring vertices
*
* @param reduceFunc the user defined reduce function which should
* be commutative and associative and is used to combine the output
* of the map phase
*
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
* desired. This is done by specifying a set of "active" vertices and an edge direction. The
* `sendMsg` function will then run only on edges connected to active vertices by edges in the
* specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
* destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
* originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
* run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
* will be run on edges with *both* vertices in the active set. The active set must have the
* same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}}
*
* @note By expressing computation at the edge level we achieve
* maximum parallelism. This is one of the core functions in the
* Graph API in that enables neighborhood level computation. For
* example this function can be used to count neighbors satisfying a
* predicate or implement PageRank.
*
*/
@deprecated("use aggregateMessages", "1.2.0")
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
Expand Down
27 changes: 27 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.graphx

import scala.reflect.ClassTag

import org.apache.spark.SparkConf
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.{BitSet, OpenHashSet}

object GraphXUtils {

/**
* Registers classes that GraphX uses with Kryo.
*/
Expand All @@ -42,4 +45,28 @@ object GraphXUtils {
classOf[OpenHashSet[Int]],
classOf[OpenHashSet[Long]]))
}

/**
* A proxy method to map the obsolete API to the new one.
*/
private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](
g: Graph[VD, ED],
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
def sendMsg(ctx: EdgeContext[VD, ED, A]) {
mapFunc(ctx.toEdgeTriplet).foreach { kv =>
val id = kv._1
val msg = kv._2
if (id == ctx.srcId) {
ctx.sendToSrc(msg)
} else {
assert(id == ctx.dstId)
ctx.sendToDst(msg)
}
}
}
g.aggregateMessagesWithActiveSet(
sendMsg, reduceFunc, TripletFields.All, activeSetOpt)
}
}
6 changes: 3 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Pregel extends Logging {
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
Expand All @@ -135,8 +135,8 @@ object Pregel extends Logging {
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
Expand Down
25 changes: 0 additions & 25 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// Lower level transformation methods
// ///////////////////////////////////////////////////////////////////////////////////////////////

override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {

def sendMsg(ctx: EdgeContext[VD, ED, A]) {
mapFunc(ctx.toEdgeTriplet).foreach { kv =>
val id = kv._1
val msg = kv._2
if (id == ctx.srcId) {
ctx.sendToSrc(msg)
} else {
assert(id == ctx.dstId)
ctx.sendToDst(msg)
}
}
}

val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)

aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
}

override def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ object SVDPlusPlus {
var gamma7: Double)
extends Serializable

/**
* This method is now replaced by the updated version of `run()` and returns exactly
* the same result.
*/
@deprecated("Call run()", "1.4.0")
def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf)
: (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
{
run(edges, conf)
}

/**
* Implement SVD++ based on "Factorization Meets the Neighborhood:
* a Multifaceted Collaborative Filtering Model",
Expand Down
52 changes: 6 additions & 46 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect().toSet === Set((1L, 2)))
}
}
Expand Down Expand Up @@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("mapReduceTriplets") {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)

// activeSetOpt
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
val numEvenNeighbors = vids.mapReduceTriplets(et => {
// Map function should only run on edges with destination in the active set
if (et.dstId % 2 != 0) {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)

// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
newOpt.getOrElse(old)
}
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)

}
}

test("aggregateMessages") {
withSpark { sc =>
val n = 5
Expand All @@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
reverseStarDegrees,
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
Expand Down Expand Up @@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph,
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {
Expand Down
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedPythonFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction$")
) ++ Seq(
// SPARK-12995 Remove deprecated APIs in graphx
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.lib.SVDPlusPlus.runSVDPlusPlus"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down

0 comments on commit 56d4939

Please sign in to comment.