This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ede35c8 [SPARK-26622][SQL] Revise SQL Metrics labels ede35c8 is described below commit ede35c88e02323c69a5a6feb7e8ff4d29ffb0456 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Thu Jan 17 10:49:42 2019 -0800 [SPARK-26622][SQL] Revise SQL Metrics labels ## What changes were proposed in this pull request? Try to make labels more obvious "avg hash probe" avg hash probe bucket iterations "partition pruning time (ms)" dynamic partition pruning time "total number of files in the table" file count "number of files that would be returned by partition pruning alone" file count after partition pruning "total size of files in the table" file size "size of files that would be returned by partition pruning alone" file size after partition pruning "metadata time (ms)" metadata time "aggregate time" time in aggregation build "aggregate time" time in aggregation build "time to construct rdd bc" time to build "total time to remove rows" time to remove "total time to update rows" time to update Add proper metric type to some metrics: "bytes of written output" written output - createSizeMetric "metadata time" - createTimingMetric "dataSize" - createSizeMetric "collectTime" - createTimingMetric "buildTime" - createTimingMetric "broadcastTIme" - createTimingMetric ## How is this patch tested? Existing tests. Author: Stacy Kerkela <stacy.kerkeladatabricks.com> Signed-off-by: Juliusz Sompolski <julekdatabricks.com> Closes #23551 from juliuszsompolski/SPARK-26622. Lead-authored-by: Juliusz Sompolski <ju...@databricks.com> Co-authored-by: Stacy Kerkela <stacy.kerk...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 2 +- .../spark/sql/execution/UnsafeFixedWidthAggregationMap.java | 6 +++--- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 7 ++++--- .../sql/execution/aggregate/ObjectHashAggregateExec.scala | 2 +- .../execution/aggregate/TungstenAggregationIterator.scala | 2 +- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 4 ++-- .../sql/execution/datasources/BasicWriteStatsTracker.scala | 2 +- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 8 ++++---- .../spark/sql/execution/streaming/statefulOperators.scala | 4 ++-- .../apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 12 ++++++------ .../spark/sql/execution/metric/SQLMetricsTestUtils.scala | 6 ++++-- 12 files changed, 31 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 2ff98a6..13ca7fb 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -854,7 +854,7 @@ public final class BytesToBytesMap extends MemoryConsumer { /** * Returns the average number of probes per key lookup. */ - public double getAverageProbesPerLookup() { + public double getAvgHashProbeBucketListIterations() { return (1.0 * numProbes) / numKeyLookups; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 7e76a65..117e98f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -226,10 +226,10 @@ public final class UnsafeFixedWidthAggregationMap { } /** - * Gets the average hash map probe per looking up for the underlying `BytesToBytesMap`. + * Gets the average bucket list iterations per lookup in the underlying `BytesToBytesMap`. */ - public double getAverageProbesPerLookup() { - return map.getAverageProbesPerLookup(); + public double getAvgHashProbeBucketListIterations() { + return map.getAvgHashProbeBucketListIterations(); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f6f3fb1..f852a52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -319,8 +319,8 @@ case class FileSourceScanExec( override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), - "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), + "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 220a4b0..19a47ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -63,8 +63,9 @@ case class HashAggregateExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), - "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"), - "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe")) + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"), + "avgHashProbe" -> + SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -362,7 +363,7 @@ case class HashAggregateExec( metrics.incPeakExecutionMemory(maxMemory) // Update average hashmap probe - avgHashProbe.set(hashMap.getAverageProbesPerLookup()) + avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations) if (sorter == null) { // not spilled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index bd52c63..5b340ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -77,7 +77,7 @@ case class ObjectHashAggregateExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time") + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build") ) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 6d849869..6dc6465 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -387,7 +387,7 @@ class TungstenAggregationIterator( metrics.incPeakExecutionMemory(maxMemory) // Updating average hashmap probe - avgHashProbe.set(hashMap.getAverageProbesPerLookup()) + avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations) }) /////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 318dca0..4352721 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -663,8 +663,8 @@ object CoalesceExec { case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { override lazy val metrics = Map( - "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), - "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect")) override def output: Seq[Attribute] = child.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index ba7d2b7..b71c2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -173,7 +173,7 @@ object BasicWriteJobStatsTracker { val sparkContext = SparkContext.getActive.get Map( NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), - NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), + NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part") ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 703d351..d55d4fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -44,10 +44,10 @@ case class BroadcastExchangeExec( child: SparkPlan) extends Exchange { override lazy val metrics = Map( - "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), - "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"), - "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"), - "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)")) + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"), + "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), + "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index c11af34..d689a6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -79,8 +79,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), - "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"), - "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to remove rows"), + "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), + "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state") ) ++ stateStoreCustomMetrics diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6174ec4..98a8ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -96,9 +96,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().count() // 2 partitions val expected1 = Seq( Map("number of output rows" -> 2L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"), + "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"), Map("number of output rows" -> 1L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) + "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)")) val shuffleExpected1 = Map( "records read" -> 2L, "local blocks read" -> 2L, @@ -114,9 +114,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df2 = testData2.groupBy('a).count() val expected2 = Seq( Map("number of output rows" -> 4L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"), + "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"), Map("number of output rows" -> 3L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) + "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)")) val shuffleExpected2 = Map( "records read" -> 4L, "local blocks read" -> 4L, @@ -162,7 +162,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get nodeIds.foreach { nodeId => - val probes = metrics(nodeId)._2("avg hash probe (min, med, max)") + val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max)") probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe => assert(probe.toDouble > 1.0) } @@ -570,7 +570,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared testSparkPlanMetrics(df, 1, Map( 0L -> (("Scan parquet default.testdataforscan", Map( "number of output rows" -> 3L, - "number of files" -> 2L)))) + "number of files read" -> 2L)))) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0e13f7d..f12eeaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -84,8 +84,10 @@ trait SQLMetricsTestUtils extends SQLTestUtils { assert(metricValue == expected) } - val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get - val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt + val totalNumBytesMetric = executedNode.metrics.find( + _.name == "written output total (min, med, max)").get + val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "") + .split(" ").head.trim.toDouble assert(totalNumBytes > 0) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org