Repository: spark
Updated Branches:
  refs/heads/master cb7fe5034 -> 56c771cb2


[SPARK-1931] Reconstruct routing tables in Graph.partitionBy

905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where, 
after repartitioning the edges, it reuses the VertexRDD without updating the 
routing tables to reflect the new edge layout. Subsequent accesses of the 
triplets contain nulls for many vertex properties.

This commit adds a test for this bug and fixes it by introducing 
`VertexRDD#withEdges` and calling it in `partitionBy`.

Author: Ankur Dave <[email protected]>

Closes #885 from ankurdave/SPARK-1931 and squashes the following commits:

3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins
9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in 
Graph.partitionBy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56c771cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56c771cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56c771cb

Branch: refs/heads/master
Commit: 56c771cb2d00a5843c391ae6561536ee46e535d4
Parents: cb7fe50
Author: Ankur Dave <[email protected]>
Authored: Mon May 26 16:10:22 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon May 26 16:10:22 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/graphx/VertexRDD.scala | 12 ++++++++++++
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 13 +++++++++----
 .../scala/org/apache/spark/graphx/GraphSuite.scala     | 10 ++++++++++
 3 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8c62897..8b910fb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
   def reverseRoutingTables(): VertexRDD[VD] =
     this.mapVertexPartitions(vPart => 
vPart.withRoutingTable(vPart.routingTable.reverse))
 
+  /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
+  def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+    val routingTables = VertexRDD.createRoutingTables(edges, 
this.partitioner.get)
+    val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
+      (partIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
+        partIter.map(_.withRoutingTable(routingTable))
+    }
+    new VertexRDD(vertexPartitions)
+  }
+
   /** Generates an RDD of vertex attributes suitable for shipping to the edge 
partitions. */
   private[graphx] def shipVertexAttributes(
       shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 2f2d0e0..1649b24 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
         }
         val edgePartition = builder.toEdgePartition
         Iterator((pid, edgePartition))
-      }, preservesPartitioning = true))
-    GraphImpl.fromExistingRDDs(vertices, newEdges)
+      }, preservesPartitioning = true)).cache()
+    GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
@@ -277,7 +277,11 @@ object GraphImpl {
     GraphImpl(vertexRDD, edgeRDD)
   }
 
-  /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated 
vertices. */
+  /**
+   * Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated 
vertices. The
+   * VertexRDD must already be set up for efficient joins with the EdgeRDD by 
calling
+   * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
+   */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
       edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
@@ -290,7 +294,8 @@ object GraphImpl {
 
   /**
    * Create a graph from a VertexRDD and an EdgeRDD with the same replicated 
vertex type as the
-   * vertices.
+   * vertices. The VertexRDD must already be set up for efficient joins with 
the EdgeRDD by calling
+   * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
    */
   def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],

http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 7b9bac5..abc25d0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
         Iterator((part.srcIds ++ part.dstIds).toSet)
       }.collect
       assert(verts.exists(id => 
partitionSetsUnpartitioned.count(_.contains(id)) > bound))
+
+      // Forming triplets view
+      val g = Graph(
+        sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
+        sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
+      assert(g.triplets.collect.map(_.toTuple).toSet ===
+        Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
+      val gPart = g.partitionBy(EdgePartition2D)
+      assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+        Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
     }
   }
 

Reply via email to