Skip to content

Commit

Permalink
SPARK-1329: Create pid2vid with correct number of partitions
Browse files Browse the repository at this point in the history
Each vertex partition is co-located with a pid2vid array created in RoutingTable.scala. This array maps edge partition IDs to the list of vertices in the current vertex partition that are mentioned by edges in that partition. Therefore the pid2vid array should have one entry per edge partition.

GraphX currently creates one entry per *vertex* partition, which is a bug that leads to an ArrayIndexOutOfBoundsException when there are more edge partitions than vertex partitions. This commit fixes the bug and adds a test for this case.

Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug.

Author: Ankur Dave <[email protected]>

Closes apache#368 from ankurdave/fix-pid2vid-size and squashes the following commits:

5a5c52a [Ankur Dave] SPARK-1329: Create pid2vid with correct number of partitions
  • Loading branch information
ankurdave authored and rxin committed Apr 17, 2014
1 parent 235a47c commit 17d3234
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
vSet.iterator.map { vid => (vid, pid) }
}

val numPartitions = vertices.partitions.size
val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
Expand Down
12 changes: 12 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("more edge partitions than vertex partitions") {
withSpark { sc =>
val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
val graph = Graph(verts, edges)
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
.collect.toSet
assert(triplets ===
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
}
}

}

0 comments on commit 17d3234

Please sign in to comment.