Repository: spark Updated Branches: refs/heads/branch-1.6 6a9f19dd5 -> 5830828ef
[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs Some VertexRDD and EdgeRDD are created during the intermediate step of g.connectedComponents() but unnecessarily left cached after the method is done. The fix is to unpersist these RDDs once they are no longer in use. A test case is added to confirm the fix for the reported bug. Author: Jason Lee <[email protected]> Closes #10713 from jasoncl/SPARK-12655. (cherry picked from commit d0a5c32bd05841f411a342a80c5da9f73f30d69a) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5830828e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5830828e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5830828e Branch: refs/heads/branch-1.6 Commit: 5830828efbf863df510a2b5b17d76214863ff48f Parents: 6a9f19d Author: Jason Lee <[email protected]> Authored: Fri Jan 15 12:04:05 2016 +0000 Committer: Sean Owen <[email protected]> Committed: Tue Jun 7 09:25:04 2016 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/graphx/Pregel.scala | 2 +- .../spark/graphx/lib/ConnectedComponents.scala | 4 +++- .../scala/org/apache/spark/graphx/GraphSuite.scala | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 2ca60d5..8a89295 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -151,7 +151,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - + messages.unpersist(blocking = false) g } // end of apply http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 859f896..f72cbb1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -47,9 +47,11 @@ object ConnectedComponents { } } val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) + ccGraph.unpersist() + pregelGraph } // end of connectedComponents } http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 9acbd79..a46c5da 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -428,6 +428,23 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { } } + test("unpersist graph RDD") { + withSpark { sc => + val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1) + val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 1) + val g0 = Graph(vert, edges) + val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2) + val cc = g.connectedComponents() + assert(sc.getPersistentRDDs.nonEmpty) + cc.unpersist() + g.unpersist() + g0.unpersist() + vert.unpersist() + edges.unpersist() + assert(sc.getPersistentRDDs.isEmpty) + } + } + test("SPARK-14219: pickRandomVertex") { withSpark { sc => val vert = sc.parallelize(List((1L, "a")), 1) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
