Repository: spark
Updated Branches:
refs/heads/master e1b85f310 -> fba8ec39c
Add caching information to rdd.toDebugString
I find it useful to see where in an RDD's DAG data is cached, so I figured
others might too.
I've added both the caching level, and the actual memory state of the RDD.
Some of this is redundant with the web UI (notably the actual memory state),
but (a) that is temporary, and (b) putting it in the DAG tree shows some
context that can help a lot.
For example:
```
(4) ShuffledRDD[3] at reduceByKey at <console>:14
+-(4) MappedRDD[2] at map at <console>:14
| MapPartitionsRDD[1] at mapPartitions at <console>:12
| ParallelCollectionRDD[0] at parallelize at <console>:12
```
should change to
```
(4) ShuffledRDD[3] at reduceByKey at <console>:14 [Memory Deserialized 1x
Replicated]
| CachedPartitions: 4; MemorySize: 50.8 MB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
+-(4) MappedRDD[2] at map at <console>:14 [Memory Deserialized 1x Replicated]
| MapPartitionsRDD[1] at mapPartitions at <console>:12 [Memory
Deserialized 1x Replicated]
| CachedPartitions: 4; MemorySize: 109.1 MB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at <console>:12 [Memory
Deserialized 1x Replicated]
```
Author: Nathan Kronenfeld <[email protected]>
Closes #1535 from nkronenfeld/feature/debug-caching2 and squashes the following
commits:
40490bc [Nathan Kronenfeld] Back out DeveloperAPI and arguments to
RDD.toDebugString, reinstate memory output
794e6a3 [Nathan Kronenfeld] Attempt to merge mima changes from master
6fe9e80 [Nathan Kronenfeld] Add exclusions to allow for signature change in
toDebugString (will back out if necessary)
31d6769 [Nathan Kronenfeld] Attempt to get rid of style errors. Add comments
for the new memory usage parameter.
a0f6f76 [Nathan Kronenfeld] Add parameter to RDD.toDebugString to allow
detailed memory info to be shown or not. Default is for it not to be shown.
f8f565a [Nathan Kronenfeld] Fix code style error
8f54287 [Nathan Kronenfeld] Changed string addition to string interpolation as
per PR comments
2a0cd4d [Nathan Kronenfeld] Fixed a small formatting issue I forgot to copy
over from the old branch
8fbecb6 [Nathan Kronenfeld] Add caching information to rdd.toDebugString
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba8ec39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba8ec39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba8ec39
Branch: refs/heads/master
Commit: fba8ec39ccf455a4a03504445bad9af420915b4f
Parents: e1b85f3
Author: Nathan Kronenfeld <[email protected]>
Authored: Thu Aug 14 22:15:33 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu Aug 14 22:15:33 2014 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 30 ++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fba8ec39/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 19e10bd..daea261 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1299,6 +1299,19 @@ abstract class RDD[T: ClassTag](
/** A description of this RDD and its recursive dependencies for debugging.
*/
def toDebugString: String = {
+ // Get a debug description of an rdd without its children
+ def debugSelf (rdd: RDD[_]): Seq[String] = {
+ import Utils.bytesToString
+
+ val persistence = storageLevel.description
+ val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id ==
rdd.id).map(info =>
+ " CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize:
%s".format(
+ info.numCachedPartitions, bytesToString(info.memSize),
+ bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
+
+ s"$rdd [$persistence]" +: storageInfo
+ }
+
// Apply a different rule to the last child
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
val len = rdd.dependencies.length
@@ -1324,7 +1337,11 @@ abstract class RDD[T: ClassTag](
val partitionStr = "(" + rdd.partitions.size + ")"
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length
- leftOffset))
- Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+
+ debugSelf(rdd).zipWithIndex.map{
+ case (desc: String, 0) => s"$partitionStr $desc"
+ case (desc: String, _) => s"$nextPrefix $desc"
+ } ++ debugChildren(rdd, nextPrefix)
}
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild:
Boolean): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
@@ -1334,7 +1351,11 @@ abstract class RDD[T: ClassTag](
thisPrefix
+ (if (isLastChild) " " else "| ")
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length -
leftOffset)))
- Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd,
nextPrefix)
+
+ debugSelf(rdd).zipWithIndex.map{
+ case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
+ case (desc: String, _) => s"$nextPrefix$desc"
+ } ++ debugChildren(rdd, nextPrefix)
}
def debugString(rdd: RDD[_],
prefix: String = "",
@@ -1342,9 +1363,8 @@ abstract class RDD[T: ClassTag](
isLastChild: Boolean = false): Seq[String] = {
if (isShuffle) {
shuffleDebugString(rdd, prefix, isLastChild)
- }
- else {
- Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
+ } else {
+ debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix)
}
}
firstDebugString(this).mkString("\n")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]