Repository: spark
Updated Branches:
refs/heads/master eba6a1af4 -> 587c315b2
[SPARK-9109] [GRAPHX] Keep the cached edge in the graph
The change here is to keep the cached RDDs in the graph object so that when the
graph.unpersist() is called these RDDs are correctly unpersisted.
```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges
graph.unpersist()
sc.getPersistentRDDs.foreach( r => println( r._2.toString))
```
Author: tien-dungle <[email protected]>
Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the
following commits:
8d87997 [tien-dungle] Keep the cached edge in the graph
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/587c315b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/587c315b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/587c315b
Branch: refs/heads/master
Commit: 587c315b204f1439f696620543c38166d95f8a3d
Parents: eba6a1a
Author: tien-dungle <[email protected]>
Authored: Fri Jul 17 12:11:32 2015 -0700
Committer: Ankur Dave <[email protected]>
Committed: Fri Jul 17 12:11:32 2015 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/587c315b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 90a74d2..da95314 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -332,9 +332,9 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
- .withTargetStorageLevel(edgeStorageLevel).cache()
+ .withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
- .withTargetStorageLevel(vertexStorageLevel).cache()
+ .withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}
@@ -346,9 +346,14 @@ object GraphImpl {
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+
+ vertices.cache()
+
// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
+ .cache()
+
GraphImpl.fromExistingRDDs(vertices, newEdges)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]