This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 545cf87dc72 [SPARK-41882][CORE][SQL][UI] Add tests for
`SQLAppStatusStore` with RocksDB backend and fix some bugs
545cf87dc72 is described below
commit 545cf87dc723342fd0f7f1a222c1a94d4b4c91a0
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 5 10:12:41 2023 -0800
[SPARK-41882][CORE][SQL][UI] Add tests for `SQLAppStatusStore` with RocksDB
backend and fix some bugs
### What changes were proposed in this pull request?
This pr add the following new test suites for `SQLAppStatusStore` with
RocksDB backend:
- `SQLAppStatusListenerWithRocksDBBackendSuite` base on
`SQLAppStatusListenerSuite`
- `AllExecutionsPageWithRocksDBBackendSuite` base on
`AllExecutionsPageSuite`
and fix bugs in `SQLExecutionUIDataSerializer` and
`SparkPlanGraphWrapperSerializer` to make the new test pass.
adds protection to
`SparkPlanGraphWrapperSerializer#serializeSparkPlanGraphNodeWrapper` to avoid
throwing NPE.
### Why are the changes needed?
Add more test for `SQLAppStatusStore` with RocksDB backend and fix bugs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Actions
- Add new tests
Closes #39385 from LuciferYang/SPARK-41432-FOLLOWUP.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 3 +-
.../sql/SQLExecutionUIDataSerializer.scala | 15 ++++++--
.../sql/SparkPlanGraphWrapperSerializer.scala | 5 ++-
.../sql/execution/ui/AllExecutionsPageSuite.scala | 41 +++++++++++++++++----
.../execution/ui/SQLAppStatusListenerSuite.scala | 43 +++++++++++++++++-----
.../sql/KVStoreProtobufSerializerSuite.scala | 4 +-
6 files changed, 84 insertions(+), 27 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 2a45b5da1d8..1c3e5bfc49a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -395,7 +395,8 @@ message SQLExecutionUIData {
optional string error_message = 9;
map<int64, JobExecutionStatus> jobs = 10;
repeated int64 stages = 11;
- map<int64, string> metric_values = 12;
+ bool metric_values_is_null = 12;
+ map<int64, string> metric_values = 13;
}
message SparkPlanGraphNode {
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 7a4a3e2a55d..09cef9663c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -49,7 +49,10 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
}
ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
val metricValues = ui.metricValues
- if (metricValues != null) {
+ if (metricValues == null) {
+ builder.setMetricValuesIsNull(true)
+ } else {
+ builder.setMetricValuesIsNull(false)
metricValues.foreach {
case (k, v) => builder.putMetricValues(k, v)
}
@@ -67,9 +70,13 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
val jobs = ui.getJobsMap.asScala.map {
case (jobId, status) => jobId.toInt ->
JobExecutionStatusSerializer.deserialize(status)
}.toMap
- val metricValues = ui.getMetricValuesMap.asScala.map {
- case (k, v) => k.toLong -> v
- }.toMap
+ val metricValues = if (ui.getMetricValuesIsNull) {
+ null
+ } else {
+ ui.getMetricValuesMap.asScala.map {
+ case (k, v) => k.toLong -> v
+ }.toMap
+ }
new SQLExecutionUIData(
executionId = ui.getExecutionId,
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index 49debedbb68..a8f715564fc 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -53,8 +53,9 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
StoreTypes.SparkPlanGraphNodeWrapper = {
val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
- builder.setNode(serializeSparkPlanGraphNode(input.node))
- builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+ Option(input.node).foreach(node =>
builder.setNode(serializeSparkPlanGraphNode(node)))
+ Option(input.cluster)
+ .foreach(cluster =>
builder.setCluster(serializeSparkPlanGraphClusterWrapper(cluster)))
builder.build()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index 495a473a014..a5368711260 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -27,14 +27,16 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd,
SparkListenerJobStart}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.status.{AppStatusStore, ElementTrackingStore}
+import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.InMemoryStore
-class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
+abstract class AllExecutionsPageSuite extends SharedSparkSession with
BeforeAndAfter {
override def sparkConf: SparkConf = {
// Disable async kv store write in the UI, to make tests more stable here.
@@ -120,12 +122,7 @@ class AllExecutionsPageSuite extends SharedSparkSession
with BeforeAndAfter {
}
- private def createStatusStore: SQLAppStatusStore = {
- val conf = sparkContext.conf
- kvstore = new ElementTrackingStore(new InMemoryStore, conf)
- val listener = new SQLAppStatusListener(conf, kvstore, live = true)
- new SQLAppStatusStore(kvstore, Some(listener))
- }
+ protected def createStatusStore: SQLAppStatusStore
private def createTestDataFrame: DataFrame = {
Seq(
@@ -178,3 +175,31 @@ class AllExecutionsPageSuite extends SharedSparkSession
with BeforeAndAfter {
}
}
+class AllExecutionsPageWithInMemoryStoreSuite extends AllExecutionsPageSuite {
+ override protected def createStatusStore: SQLAppStatusStore = {
+ val conf = sparkContext.conf
+ kvstore = new ElementTrackingStore(new InMemoryStore, conf)
+ val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+ new SQLAppStatusStore(kvstore, Some(listener))
+ }
+}
+
+class AllExecutionsPageWithRocksDBBackendSuite extends AllExecutionsPageSuite {
+ private val storePath = Utils.createTempDir()
+ override protected def createStatusStore(): SQLAppStatusStore = {
+ val conf = sparkContext.conf
+ conf.set(LIVE_UI_LOCAL_STORE_DIR, storePath.getCanonicalPath)
+ val appStatusStore = AppStatusStore.createLiveStore(conf)
+ kvstore = appStatusStore.store.asInstanceOf[ElementTrackingStore]
+ val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+ new SQLAppStatusStore(kvstore, Some(listener))
+ }
+
+ protected override def afterAll(): Unit = {
+ if (storePath.exists()) {
+ Utils.deleteRecursively(storePath)
+ }
+ super.afterAll()
+ }
+}
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 1cecb2a6ba9..3d80935fdda 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -56,12 +56,12 @@ import
org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol,
LongAccumulator, SerializableConfiguration}
+import org.apache.spark.status.{AppStatusStore, ElementTrackingStore}
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol,
LongAccumulator, SerializableConfiguration, Utils}
import org.apache.spark.util.kvstore.InMemoryStore
-class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
+abstract class SQLAppStatusListenerSuite extends SharedSparkSession with
JsonTestUtils
with BeforeAndAfter {
import testImplicits._
@@ -70,7 +70,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession
with JsonTestUtils
super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD,
0L).set(ASYNC_TRACKING_ENABLED, false)
}
- private var kvstore: ElementTrackingStore = _
+ protected var kvstore: ElementTrackingStore = _
after {
if (kvstore != null) {
@@ -155,12 +155,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession
with JsonTestUtils
assert(actualFailed.sorted === failed)
}
- private def createStatusStore(): SQLAppStatusStore = {
- val conf = sparkContext.conf
- kvstore = new ElementTrackingStore(new InMemoryStore, conf)
- val listener = new SQLAppStatusListener(conf, kvstore, live = true)
- new SQLAppStatusStore(kvstore, Some(listener))
- }
+ protected def createStatusStore(): SQLAppStatusStore
test("basic") {
def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]):
Unit = {
@@ -1007,6 +1002,34 @@ class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTestUtils
}
}
+class SQLAppStatusListenerWithInMemoryStoreSuite extends
SQLAppStatusListenerSuite {
+ override protected def createStatusStore(): SQLAppStatusStore = {
+ val conf = sparkContext.conf
+ kvstore = new ElementTrackingStore(new InMemoryStore, conf)
+ val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+ new SQLAppStatusStore(kvstore, Some(listener))
+ }
+}
+
+class SQLAppStatusListenerWithRocksDBBackendSuite extends
SQLAppStatusListenerSuite {
+ private val storePath = Utils.createTempDir()
+
+ override protected def createStatusStore(): SQLAppStatusStore = {
+ val conf = sparkContext.conf
+ conf.set(LIVE_UI_LOCAL_STORE_DIR, storePath.getCanonicalPath)
+ val appStatusStore = AppStatusStore.createLiveStore(conf)
+ kvstore = appStatusStore.store.asInstanceOf[ElementTrackingStore]
+ val listener = new SQLAppStatusListener(conf, kvstore, live = true)
+ new SQLAppStatusStore(kvstore, Some(listener))
+ }
+
+ protected override def afterAll(): Unit = {
+ if (storePath.exists()) {
+ Utils.deleteRecursively(storePath)
+ }
+ super.afterAll()
+ }
+}
/**
* A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a
[[SQLMetrics]]
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index a031eb69bf2..b590f6dd42c 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -92,8 +92,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
)
val bytes2 = serializer.serialize(input2)
val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
- // input.metricValues is null, result.metricValues is also empty map.
- assert(result2.metricValues.isEmpty)
+ // input.metricValues is null, result.metricValues is null.
+ assert(result2.metricValues == null)
}
test("Spark Plan Graph") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]