Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4a10df0f6 -> df45ddb9d


[SPARK-24104] SQLAppStatusListener overwrites metrics onDriverAccumUpdates 
instead of updating them

## What changes were proposed in this pull request?

Event `SparkListenerDriverAccumUpdates` may happen multiple times in a query - 
e.g. every `FileSourceScanExec` and `BroadcastExchangeExec` call 
`postDriverMetricUpdates`.
In Spark 2.2 `SQLListener` updated the map with new values. 
`SQLAppStatusListener` overwrites it.
Unless `update` preserved it in the KV store (dependant on 
`exec.lastWriteTime`), only the metrics from the last operator that does 
`postDriverMetricUpdates` are preserved.

## How was this patch tested?

Unit test added.

Author: Juliusz Sompolski <[email protected]>

Closes #21171 from juliuszsompolski/SPARK-24104.

(cherry picked from commit 8614edd445264007144caa6743a8c2ca2b5082e0)
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df45ddb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df45ddb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df45ddb9

Branch: refs/heads/branch-2.3
Commit: df45ddb9dea9bf42d18c1164cf35067c7cac5d6f
Parents: 4a10df0
Author: Juliusz Sompolski <[email protected]>
Authored: Fri Apr 27 14:14:28 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Apr 27 14:14:38 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/ui/SQLAppStatusListener.scala |  2 +-
 .../ui/SQLAppStatusListenerSuite.scala          | 24 ++++++++++++++++----
 2 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df45ddb9/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 2b6bb48..d254af4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -289,7 +289,7 @@ class SQLAppStatusListener(
   private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): 
Unit = {
     val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
     Option(liveExecutions.get(executionId)).foreach { exec =>
-      exec.driverAccumUpdates = accumUpdates.toMap
+      exec.driverAccumUpdates = exec.driverAccumUpdates ++ accumUpdates
       update(exec)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/df45ddb9/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index f3f0883..02df45d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -443,7 +443,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with 
SharedSQLContext with
     val oldCount = statusStore.executionsList().size
 
     val expectedAccumValue = 12345
-    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
+    val expectedAccumValue2 = 54321
+    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue, 
expectedAccumValue2)
     val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
       override lazy val sparkPlan = physicalPlan
       override lazy val executedPlan = physicalPlan
@@ -466,10 +467,14 @@ class SQLAppStatusListenerSuite extends SparkFunSuite 
with SharedSQLContext with
     val execId = statusStore.executionsList().last.executionId
     val metrics = statusStore.executionMetrics(execId)
     val driverMetric = physicalPlan.metrics("dummy")
+    val driverMetric2 = physicalPlan.metrics("dummy2")
     val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, 
Seq(expectedAccumValue))
+    val expectedValue2 = SQLMetrics.stringValue(driverMetric2.metricType, 
Seq(expectedAccumValue2))
 
     assert(metrics.contains(driverMetric.id))
     assert(metrics(driverMetric.id) === expectedValue)
+    assert(metrics.contains(driverMetric2.id))
+    assert(metrics(driverMetric2.id) === expectedValue2)
   }
 
   test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol 
(SPARK-18462)") {
@@ -562,20 +567,31 @@ class SQLAppStatusListenerSuite extends SparkFunSuite 
with SharedSQLContext with
  * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a 
[[SQLMetrics]]
  * on the driver.
  */
-private case class MyPlan(sc: SparkContext, expectedValue: Long) extends 
LeafExecNode {
+private case class MyPlan(sc: SparkContext, expectedValue: Long, 
expectedValue2: Long)
+  extends LeafExecNode {
+
   override def sparkContext: SparkContext = sc
   override def output: Seq[Attribute] = Seq()
 
   override val metrics: Map[String, SQLMetric] = Map(
-    "dummy" -> SQLMetrics.createMetric(sc, "dummy"))
+    "dummy" -> SQLMetrics.createMetric(sc, "dummy"),
+    "dummy2" -> SQLMetrics.createMetric(sc, "dummy2"))
 
   override def doExecute(): RDD[InternalRow] = {
     longMetric("dummy") += expectedValue
+    longMetric("dummy2") += expectedValue2
+
+    // postDriverMetricUpdates may happen multiple time in a query.
+    // (normally from different operators, but for the sake of testing, from 
one operator)
+    SQLMetrics.postDriverMetricUpdates(
+      sc,
+      sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
+      Seq(metrics("dummy")))
 
     SQLMetrics.postDriverMetricUpdates(
       sc,
       sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
-      metrics.values.toSeq)
+      Seq(metrics("dummy2")))
     sc.emptyRDD
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to