Repository: spark
Updated Branches:
refs/heads/master 1f43562da -> d5012c274
[SPARK-15495][SQL] Improve the explain output for Aggregation operator
## What changes were proposed in this pull request?
This PR improves the explain output of Aggregator operator.
SQL:
```
Seq((1,2,3)).toDF("a", "b", "c").createTempView("df1")
spark.sql("cache table df1")
spark.sql("select count(a), count(c), b from df1 group by b").explain()
```
**Before change:**
```
*TungstenAggregate(key=[b#8], functions=[count(1),count(1)],
output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
+- *TungstenAggregate(key=[b#8],
functions=[partial_count(1),partial_count(1)], output=[b#8,count#98L,count#99L])
+- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000,
StorageLevel(disk=true, memory=true, offheap=false, deserialized=true,
replication=1), LocalTableScan [a#7,b#8,c#9], [[1,2,3]], Some(df1)
``````
**After change:**
```
*Aggregate(key=[b#8], functions=[count(1),count(1)],
output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
+- *Aggregate(key=[b#8], functions=[partial_count(1),partial_count(1)],
output=[b#8,count#98L,count#99L])
+- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas), LocalTableScan
[a#7,b#8,c#9], [[1,2,3]], Some(df1)
```
## How was this patch tested?
Manual test and existing UT.
Author: Sean Zhong <[email protected]>
Closes #13363 from clockfly/verbose3.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5012c27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5012c27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5012c27
Branch: refs/heads/master
Commit: d5012c274036463c47a751cfe9977ade3a68b668
Parents: 1f43562
Author: Sean Zhong <[email protected]>
Authored: Wed Jun 1 09:58:01 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 1 09:58:01 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/storage/StorageLevel.scala | 10 ++++++++--
.../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +-
.../sql/execution/aggregate/SortBasedAggregateExec.scala | 2 +-
.../spark/sql/execution/aggregate/TungstenAggregate.scala | 4 ++--
4 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d5012c27/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 216ec07..fad0404 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -120,8 +120,14 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String = {
- s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
- s"deserialized=$deserialized, replication=$replication)"
+ val disk = if (useDisk) "disk" else ""
+ val memory = if (useMemory) "memory" else ""
+ val heap = if (useOffHeap) "offheap" else ""
+ val deserialize = if (deserialized) "deserialized" else ""
+
+ val output =
+ Seq(disk, memory, heap, deserialize, s"$replication
replicas").filter(_.nonEmpty)
+ s"StorageLevel(${output.mkString(", ")})"
}
override def hashCode(): Int = toInt * 41 + replication
http://git-wip-us.apache.org/repos/asf/spark/blob/d5012c27/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index e8e2a7b..d87e6c7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -434,7 +434,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product {
case other => other :: Nil
}.mkString(", ")
- /** String representation of this node without any children. */
+ /** ONE line description of this node. */
def simpleString: String = s"$nodeName $argString".trim
override def toString: String = treeString
http://git-wip-us.apache.org/repos/asf/spark/blob/d5012c27/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
index 2e74d59..af1fb4c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
@@ -106,6 +106,6 @@ case class SortBasedAggregateExec(
val keyString = groupingExpressions.mkString("[", ",", "]")
val functionString = allAggregateExpressions.mkString("[", ",", "]")
val outputString = output.mkString("[", ",", "]")
- s"SortBasedAggregate(key=$keyString, functions=$functionString,
output=$outputString)"
+ s"SortAggregate(key=$keyString, functions=$functionString,
output=$outputString)"
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d5012c27/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 905e93c..0911779 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -769,9 +769,9 @@ case class TungstenAggregate(
val keyString = groupingExpressions.mkString("[", ",", "]")
val functionString = allAggregateExpressions.mkString("[", ",", "]")
val outputString = output.mkString("[", ",", "]")
- s"TungstenAggregate(key=$keyString, functions=$functionString,
output=$outputString)"
+ s"Aggregate(key=$keyString, functions=$functionString,
output=$outputString)"
case Some(fallbackStartsAt) =>
- s"TungstenAggregateWithControlledFallback $groupingExpressions " +
+ s"AggregateWithControlledFallback $groupingExpressions " +
s"$allAggregateExpressions $resultExpressions
fallbackStartsAt=$fallbackStartsAt"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]