This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 545cf87dc72 [SPARK-41882][CORE][SQL][UI] Add tests for 
`SQLAppStatusStore` with RocksDB backend and fix some bugs
545cf87dc72 is described below

commit 545cf87dc723342fd0f7f1a222c1a94d4b4c91a0
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 5 10:12:41 2023 -0800

    [SPARK-41882][CORE][SQL][UI] Add tests for `SQLAppStatusStore` with RocksDB 
backend and fix some bugs
    
    ### What changes were proposed in this pull request?
    This pr add the following new test suites for `SQLAppStatusStore` with 
RocksDB backend:
    
    - `SQLAppStatusListenerWithRocksDBBackendSuite` base on 
`SQLAppStatusListenerSuite`
    - `AllExecutionsPageWithRocksDBBackendSuite` base on 
`AllExecutionsPageSuite`
    
    and fix bugs in `SQLExecutionUIDataSerializer` and 
`SparkPlanGraphWrapperSerializer` to make the new test pass.
    
    adds protection to 
`SparkPlanGraphWrapperSerializer#serializeSparkPlanGraphNodeWrapper` to avoid 
throwing NPE.
    
    ### Why are the changes needed?
    Add more test for `SQLAppStatusStore` with RocksDB backend and fix bugs.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass GitHub Actions
    - Add new tests
    
    Closes #39385 from LuciferYang/SPARK-41432-FOLLOWUP.
    
    Lead-authored-by: yangjie01 <[email protected]>
    Co-authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../apache/spark/status/protobuf/store_types.proto |  3 +-
 .../sql/SQLExecutionUIDataSerializer.scala         | 15 ++++++--
 .../sql/SparkPlanGraphWrapperSerializer.scala      |  5 ++-
 .../sql/execution/ui/AllExecutionsPageSuite.scala  | 41 +++++++++++++++++----
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 43 +++++++++++++++++-----
 .../sql/KVStoreProtobufSerializerSuite.scala       |  4 +-
 6 files changed, 84 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 2a45b5da1d8..1c3e5bfc49a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -395,7 +395,8 @@ message SQLExecutionUIData {
   optional string error_message = 9;
   map<int64, JobExecutionStatus> jobs = 10;
   repeated int64 stages = 11;
-  map<int64, string> metric_values = 12;
+  bool metric_values_is_null = 12;
+  map<int64, string> metric_values = 13;
 }
 
 message SparkPlanGraphNode {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 7a4a3e2a55d..09cef9663c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -49,7 +49,10 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
     }
     ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
     val metricValues = ui.metricValues
-    if (metricValues != null) {
+    if (metricValues == null) {
+      builder.setMetricValuesIsNull(true)
+    } else {
+      builder.setMetricValuesIsNull(false)
       metricValues.foreach {
         case (k, v) => builder.putMetricValues(k, v)
       }
@@ -67,9 +70,13 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
     val jobs = ui.getJobsMap.asScala.map {
       case (jobId, status) => jobId.toInt -> 
JobExecutionStatusSerializer.deserialize(status)
     }.toMap
-    val metricValues = ui.getMetricValuesMap.asScala.map {
-      case (k, v) => k.toLong -> v
-    }.toMap
+    val metricValues = if (ui.getMetricValuesIsNull) {
+      null
+    } else {
+      ui.getMetricValuesMap.asScala.map {
+        case (k, v) => k.toLong -> v
+      }.toMap
+    }
 
     new SQLExecutionUIData(
       executionId = ui.getExecutionId,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index 49debedbb68..a8f715564fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -53,8 +53,9 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
     StoreTypes.SparkPlanGraphNodeWrapper = {
 
     val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
-    builder.setNode(serializeSparkPlanGraphNode(input.node))
-    builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+    Option(input.node).foreach(node => 
builder.setNode(serializeSparkPlanGraphNode(node)))
+    Option(input.cluster)
+      .foreach(cluster => 
builder.setCluster(serializeSparkPlanGraphClusterWrapper(cluster)))
     builder.build()
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index 495a473a014..a5368711260 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -27,14 +27,16 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, 
SparkListenerJobStart}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.status.{AppStatusStore, ElementTrackingStore}
+import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore.InMemoryStore
 
-class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
+abstract class AllExecutionsPageSuite extends SharedSparkSession with 
BeforeAndAfter {
 
   override def sparkConf: SparkConf = {
     // Disable async kv store write in the UI, to make tests more stable here.
@@ -120,12 +122,7 @@ class AllExecutionsPageSuite extends SharedSparkSession 
with BeforeAndAfter {
   }
 
 
-  private def createStatusStore: SQLAppStatusStore = {
-    val conf = sparkContext.conf
-    kvstore = new ElementTrackingStore(new InMemoryStore, conf)
-    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
-    new SQLAppStatusStore(kvstore, Some(listener))
-  }
+  protected def createStatusStore: SQLAppStatusStore
 
   private def createTestDataFrame: DataFrame = {
     Seq(
@@ -178,3 +175,31 @@ class AllExecutionsPageSuite extends SharedSparkSession 
with BeforeAndAfter {
   }
 }
 
+class AllExecutionsPageWithInMemoryStoreSuite extends AllExecutionsPageSuite {
+  override protected def createStatusStore: SQLAppStatusStore = {
+    val conf = sparkContext.conf
+    kvstore = new ElementTrackingStore(new InMemoryStore, conf)
+    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+    new SQLAppStatusStore(kvstore, Some(listener))
+  }
+}
+
+class AllExecutionsPageWithRocksDBBackendSuite extends AllExecutionsPageSuite {
+  private val storePath = Utils.createTempDir()
+  override protected def createStatusStore(): SQLAppStatusStore = {
+    val conf = sparkContext.conf
+    conf.set(LIVE_UI_LOCAL_STORE_DIR, storePath.getCanonicalPath)
+    val appStatusStore = AppStatusStore.createLiveStore(conf)
+    kvstore = appStatusStore.store.asInstanceOf[ElementTrackingStore]
+    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+    new SQLAppStatusStore(kvstore, Some(listener))
+  }
+
+  protected override def afterAll(): Unit = {
+    if (storePath.exists()) {
+      Utils.deleteRecursively(storePath)
+    }
+    super.afterAll()
+  }
+}
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 1cecb2a6ba9..3d80935fdda 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -56,12 +56,12 @@ import 
org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator, SerializableConfiguration}
+import org.apache.spark.status.{AppStatusStore, ElementTrackingStore}
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator, SerializableConfiguration, Utils}
 import org.apache.spark.util.kvstore.InMemoryStore
 
 
-class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
+abstract class SQLAppStatusListenerSuite extends SharedSparkSession with 
JsonTestUtils
   with BeforeAndAfter {
 
   import testImplicits._
@@ -70,7 +70,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession 
with JsonTestUtils
     super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 
0L).set(ASYNC_TRACKING_ENABLED, false)
   }
 
-  private var kvstore: ElementTrackingStore = _
+  protected var kvstore: ElementTrackingStore = _
 
   after {
     if (kvstore != null) {
@@ -155,12 +155,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession 
with JsonTestUtils
     assert(actualFailed.sorted === failed)
   }
 
-  private def createStatusStore(): SQLAppStatusStore = {
-    val conf = sparkContext.conf
-    kvstore = new ElementTrackingStore(new InMemoryStore, conf)
-    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
-    new SQLAppStatusStore(kvstore, Some(listener))
-  }
+  protected def createStatusStore(): SQLAppStatusStore
 
   test("basic") {
     def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): 
Unit = {
@@ -1007,6 +1002,34 @@ class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTestUtils
   }
 }
 
+class SQLAppStatusListenerWithInMemoryStoreSuite extends 
SQLAppStatusListenerSuite {
+  override protected def createStatusStore(): SQLAppStatusStore = {
+    val conf = sparkContext.conf
+    kvstore = new ElementTrackingStore(new InMemoryStore, conf)
+    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+    new SQLAppStatusStore(kvstore, Some(listener))
+  }
+}
+
+class SQLAppStatusListenerWithRocksDBBackendSuite extends 
SQLAppStatusListenerSuite {
+  private val storePath = Utils.createTempDir()
+
+  override protected def createStatusStore(): SQLAppStatusStore = {
+    val conf = sparkContext.conf
+    conf.set(LIVE_UI_LOCAL_STORE_DIR, storePath.getCanonicalPath)
+    val appStatusStore = AppStatusStore.createLiveStore(conf)
+    kvstore = appStatusStore.store.asInstanceOf[ElementTrackingStore]
+    val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+    new SQLAppStatusStore(kvstore, Some(listener))
+  }
+
+  protected override def afterAll(): Unit = {
+    if (storePath.exists()) {
+      Utils.deleteRecursively(storePath)
+    }
+    super.afterAll()
+  }
+}
 
 /**
  * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a 
[[SQLMetrics]]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index a031eb69bf2..b590f6dd42c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -92,8 +92,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
     )
     val bytes2 = serializer.serialize(input2)
     val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
-    // input.metricValues is null, result.metricValues is also empty map.
-    assert(result2.metricValues.isEmpty)
+    // input.metricValues is null, result.metricValues is null.
+    assert(result2.metricValues == null)
   }
 
   test("Spark Plan Graph") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to