This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a309a0570279 [SPARK-54390][SS] Fix JSON Deserialize in
StreamingQueryListenerBus
a309a0570279 is described below
commit a309a05702795df575c1e86639ccb8d6948f6c5f
Author: Dylan Wong <[email protected]>
AuthorDate: Fri Jan 2 11:53:33 2026 -0800
[SPARK-54390][SS] Fix JSON Deserialize in StreamingQueryListenerBus
### What changes were proposed in this pull request?
The PR changes a few things:
1. Convert JSON objects and array to strings when deserializing using
`ObjectToStringDeserializer`. (main fix)
2. Sets the default values for `inputRowsPerSecond` and
`processedRowsPerSecond` to be `Double.NaN`. This fixes another silent
deserialization issue. (fix makes testing easier)
3. When getting `jsonValue ` for `"observedMetrics"` in
`StreamingQueryProgress` we return `JNothing` if parsing fails. (fix makes
testing easier, but is also another issue that needs to be addressed in the
future)
### Why are the changes needed?
When using Spark Connect, JSON based offsets will not be deserialized
properly.
### Does this PR introduce _any_ user-facing change?
Yes, this corrects the issues with the StreamingQueryListener with Spark
Connect.
### How was this patch tested?
Added a new test progress to unit tests which contains Object and Array
based source offsets.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53102 from dylanwong250/SPARK-54390.
Authored-by: Dylan Wong <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../org/apache/spark/sql/streaming/progress.scala | 41 +++-
.../streaming/StreamingQueryListenerSuite.scala | 22 ++
.../StreamingQueryStatusAndProgressSuite.scala | 253 ++++++++++++++++++++-
3 files changed, 302 insertions(+), 14 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 2f5b9475a750..5652545ea567 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
import scala.math.BigDecimal.RoundingMode
import scala.util.control.NonFatal
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext,
DeserializationFeature, JsonDeserializer, JsonNode, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{ClassTagExtensions,
DefaultScalaModule}
import org.json4s._
@@ -191,7 +192,19 @@ class StreamingQueryProgress private[spark] (
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
- ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) =>
row.jsonValue))
+ ("observedMetrics" -> {
+ // TODO: SPARK-54391
+ // In Spark connect, the observedMetrics is serialized but is not
deserialized properly when
+ // being sent back to the client and the schema is null. So calling
row.jsonValue will throw
+ // an exception so we need to catch the exception and return JNothing.
+ // This is because the Row.jsonValue method is a one way method and
there is no reverse
+ // method to convert the JSON back to a Row.
+ try {
+ safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue)
+ } catch {
+ case NonFatal(e) => JNothing
+ }
+ })
}
}
@@ -210,6 +223,19 @@ private[spark] object StreamingQueryProgress {
mapper.readValue[StreamingQueryProgress](json)
}
+// SPARK-54390: Custom deserializer that converts JSON objects to strings for
offset fields
+private class ObjectToStringDeserializer extends JsonDeserializer[String] {
+ override def deserialize(parser: JsonParser, context:
DeserializationContext): String = {
+ val node: JsonNode = parser.readValueAsTree()
+ if (node.isTextual) {
+ node.asText()
+ } else {
+ // Convert JSON object/array to string representation
+ node.toString
+ }
+ }
+}
+
/**
* Information about progress made for a source in the execution of a
[[StreamingQuery]] during a
* trigger. See [[StreamingQueryProgress]] for more information.
@@ -233,12 +259,19 @@ private[spark] object StreamingQueryProgress {
@Evolving
class SourceProgress protected[spark] (
val description: String,
+ // SPARK-54390: Use a custom deserializer to convert the JSON object to a
string.
+ @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
val startOffset: String,
+ @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
val endOffset: String,
+ @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
val latestOffset: String,
val numInputRows: Long,
- val inputRowsPerSecond: Double,
- val processedRowsPerSecond: Double,
+ // The NaN is used in deserialization to indicate the value was not set.
+ // The NaN is then used to not output this field in the JSON.
+ // In Spark connect, we need to ensure that the default value is
Double.NaN instead of 0.0.
+ val inputRowsPerSecond: Double = Double.NaN,
+ val processedRowsPerSecond: Double = Double.NaN,
val metrics: ju.Map[String, String] = Map[String, String]().asJava)
extends Serializable {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4eabc82281e1..645dbef7abfd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -21,6 +21,7 @@ import java.util.UUID
import scala.collection.mutable
+import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.scalactic.{Equality, TolerantNumerics}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -286,6 +287,12 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
)
}
+ private def removeFieldFromJson(jsonString: String, fieldName: String):
String = {
+ val jv = parse(jsonString, useBigDecimalForDouble = true)
+ val removed = jv.removeField { case (name, _) => name == fieldName }
+ compact(render(removed))
+ }
+
test("QueryProgressEvent serialization") {
def testSerialization(event: QueryProgressEvent): Unit = {
import scala.jdk.CollectionConverters._
@@ -294,9 +301,24 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
assert(newEvent.progress.json === event.progress.json) // json as a
proxy for equality
assert(newEvent.progress.durationMs.asScala ===
event.progress.durationMs.asScala)
assert(newEvent.progress.eventTime.asScala ===
event.progress.eventTime.asScala)
+
+ // Verify we can get the event back from the JSON string, this is
important for Spark Connect
+ // and the StreamingQueryListenerBus. This is the method that is used to
deserialize the event
+ // in StreamingQueryListenerBus.queryEventHandler
+ val eventFromNewEvent = QueryProgressEvent.fromJson(newEvent.json)
+ // TODO: Remove after SC-206585 is fixed
+ // We remove the observedMetrics field because it is not serialized
properly when being
+ // removed from the listener bus, so this test is to verify that
everything expect the
+ // observedMetrics field is equal in the JSON string
+ val eventWithoutObservedMetrics =
removeFieldFromJson(event.progress.json, "observedMetrics")
+ assert(eventFromNewEvent.progress.json === eventWithoutObservedMetrics)
}
testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
+ testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress3))
+ testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress4))
+ testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress5))
+ testSerialization(new
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress6))
}
test("QueryTerminatedEvent serialization") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 9c1e16460879..5b692e4c42c0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -169,21 +169,148 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
|}
""".stripMargin.trim)
assert(compact(parse(json2)) === testProgress2.json)
+
+ val json5 = testProgress5.prettyJson
+ assertJson(
+ json5,
+ s"""
+ |{
+ | "id" : "${testProgress5.id.toString}",
+ | "runId" : "${testProgress5.runId.toString}",
+ | "name" : null,
+ | "timestamp" : "2025-08-22T00:00:00.111Z",
+ | "batchId" : 97,
+ | "batchDuration" : 12,
+ | "numInputRows" : 201,
+ | "inputRowsPerSecond" : 20.1,
+ | "processedRowsPerSecond" : 20.1,
+ | "stateOperators" : [ ],
+ | "sources" : [ {
+ | "description" : "kafka",
+ | "startOffset" : {
+ | "topic" : {
+ | "0" : 123
+ | }
+ | },
+ | "endOffset" : {
+ | "topic" : {
+ | "0" : 456
+ | }
+ | },
+ | "latestOffset" : {
+ | "topic" : {
+ | "0" : 789
+ | }
+ | },
+ | "numInputRows" : 100,
+ | "inputRowsPerSecond" : 10.0,
+ | "processedRowsPerSecond" : 10.0
+ | }, {
+ | "description" : "kinesis",
+ | "startOffset" : [ {
+ | "shard" : {
+ | "stream" : "stream1",
+ | "shardId" : "shard1"
+ | },
+ | "firstSeqNum" : null,
+ | "lastSeqNum" : "123",
+ | "closed" : false,
+ | "msBehindLatest" : null,
+ | "lastRecordSeqNum" : null
+ | } ],
+ | "endOffset" : [ {
+ | "shard" : {
+ | "stream" : "stream1",
+ | "shardId" : "shard1"
+ | },
+ | "firstSeqNum" : null,
+ | "lastSeqNum" : "456",
+ | "closed" : false,
+ | "msBehindLatest" : null,
+ | "lastRecordSeqNum" : null
+ | } ],
+ | "latestOffset" : [ {
+ | "shard" : {
+ | "stream" : "stream1",
+ | "shardId" : "shard1"
+ | },
+ | "firstSeqNum" : null,
+ | "lastSeqNum" : "789",
+ | "closed" : false,
+ | "msBehindLatest" : null,
+ | "lastRecordSeqNum" : null
+ | } ],
+ | "numInputRows" : 101,
+ | "inputRowsPerSecond" : 10.1,
+ | "processedRowsPerSecond" : 10.1
+ | } ],
+ | "sink" : {
+ | "description" : "sink",
+ | "numOutputRows" : -1
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json5)) === testProgress5.json)
+
+ val json6 = testProgress6.prettyJson
+ assertJson(
+ json6,
+ s"""
+ |{
+ | "id" : "${testProgress6.id.toString}",
+ | "runId" : "${testProgress6.runId.toString}",
+ | "name" : "myName",
+ | "timestamp" : "2025-09-19T00:00:00.111Z",
+ | "batchId" : 97,
+ | "batchDuration" : 12,
+ | "numInputRows" : 1001,
+ | "stateOperators" : [ ],
+ | "sources" : [ {
+ | "description" : "kafka",
+ | "startOffset" : 1000,
+ | "endOffset" : 2000,
+ | "latestOffset" : 3000,
+ | "numInputRows" : 1001
+ | } ],
+ | "sink" : {
+ | "description" : "sink",
+ | "numOutputRows" : -1
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json6)) === testProgress6.json)
}
test("StreamingQueryProgress - json") {
assert(compact(parse(testProgress1.json)) === testProgress1.json)
assert(compact(parse(testProgress2.json)) === testProgress2.json)
assert(compact(parse(testProgress3.json)) === testProgress3.json)
+ assert(compact(parse(testProgress4.json, useBigDecimalForDouble = true))
=== testProgress4.json)
+ assert(compact(parse(testProgress5.json)) === testProgress5.json)
+ assert(compact(parse(testProgress6.json)) === testProgress6.json)
+ assert(compact(parse(testProgress7.json)) === testProgress7.json)
}
test("StreamingQueryProgress - toString") {
assert(testProgress1.toString === testProgress1.prettyJson)
assert(testProgress2.toString === testProgress2.prettyJson)
+ assert(testProgress3.toString === testProgress3.prettyJson)
+ assert(testProgress4.toString === testProgress4.prettyJson)
+ assert(testProgress5.toString === testProgress5.prettyJson)
+ assert(testProgress6.toString === testProgress6.prettyJson)
+ assert(testProgress7.toString === testProgress7.prettyJson)
}
test("StreamingQueryProgress - jsonString and fromJson") {
- Seq(testProgress1, testProgress2).foreach { input =>
+ Seq(
+ testProgress1,
+ testProgress2,
+ testProgress3,
+ testProgress4,
+ testProgress5,
+ testProgress6,
+ testProgress7
+ ).foreach { input =>
val jsonString = StreamingQueryProgress.jsonString(input)
val result = StreamingQueryProgress.fromJson(jsonString)
assert(input.id == result.id)
@@ -221,7 +348,11 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
} else {
assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
}
- assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+ if (s1.processedRowsPerSecond.isNaN) {
+ assert(s2.processedRowsPerSecond.isNaN)
+ } else {
+ assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+ }
assert(s1.metrics == s2.metrics)
}
@@ -232,10 +363,14 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
}
val resultObservedMetrics = result.observedMetrics
- assert(input.observedMetrics.size() == resultObservedMetrics.size())
- assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet())
- input.observedMetrics.entrySet().forEach { e =>
- assert(e.getValue == resultObservedMetrics.get(e.getKey))
+ if (resultObservedMetrics != null) {
+ assert(input.observedMetrics.size() == resultObservedMetrics.size())
+ assert(input.observedMetrics.keySet() ==
resultObservedMetrics.keySet())
+ input.observedMetrics.entrySet().forEach { e =>
+ assert(e.getValue == resultObservedMetrics.get(e.getKey))
+ }
+ } else {
+ assert(input.observedMetrics == null)
}
}
}
@@ -437,8 +572,8 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
}
test("SPARK-53690: avgOffsetsBehindLatest should never be in scientific
notation") {
- val progress = testProgress5.jsonValue
- val progressPretty = testProgress5.prettyJson
+ val progress = testProgress7.jsonValue
+ val progressPretty = testProgress7.prettyJson
// Actual values
val avgOffsetsBehindLatest: Double = 2.70941269E8
@@ -465,8 +600,8 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
progressPretty shouldBe
s"""
|{
- | "id" : "${testProgress5.id.toString}",
- | "runId" : "${testProgress5.runId.toString}",
+ | "id" : "${testProgress7.id.toString}",
+ | "runId" : "${testProgress7.runId.toString}",
| "name" : "KafkaMetricsTest",
| "timestamp" : "2025-09-23T06:00:00.000Z",
| "batchId" : 1250,
@@ -680,6 +815,104 @@ object StreamingQueryStatusAndProgressSuite {
)
val testProgress5 = new StreamingQueryProgress(
+ id = UUID.randomUUID,
+ runId = UUID.randomUUID,
+ name = null, // should not be present in the json
+ timestamp = "2025-08-22T00:00:00.111Z",
+ batchId = 97L,
+ batchDuration = 12L,
+ durationMs = null,
+ // empty maps should be handled correctly
+ eventTime = null,
+ stateOperators = Array(),
+ sources = Array(
+ new SourceProgress(
+ description = "kafka",
+ startOffset = """{"topic":{"0":123}}""",
+ endOffset = """{"topic":{"0":456}}""",
+ latestOffset = """{"topic":{"0":789}}""",
+ numInputRows = 100,
+ inputRowsPerSecond = 10.0,
+ processedRowsPerSecond = 10.0
+ ),
+ new SourceProgress(
+ description = "kinesis",
+ startOffset =
+ """
+ |[{
+ | "shard": {
+ | "stream": "stream1",
+ | "shardId": "shard1"
+ | },
+ | "firstSeqNum": null,
+ | "lastSeqNum": "123",
+ | "closed": false,
+ | "msBehindLatest": null,
+ | "lastRecordSeqNum": null
+ |}]
+ """.stripMargin,
+ endOffset =
+ """
+ |[{
+ | "shard": {
+ | "stream": "stream1",
+ | "shardId": "shard1"
+ | },
+ | "firstSeqNum": null,
+ | "lastSeqNum": "456",
+ | "closed": false,
+ | "msBehindLatest": null,
+ | "lastRecordSeqNum": null
+ |}]
+ """.stripMargin,
+ latestOffset =
+ """
+ |[{
+ | "shard": {
+ | "stream": "stream1",
+ | "shardId": "shard1"
+ | },
+ | "firstSeqNum": null,
+ | "lastSeqNum": "789",
+ | "closed": false,
+ | "msBehindLatest": null,
+ | "lastRecordSeqNum": null
+ |}]
+ """.stripMargin,
+ numInputRows = 101,
+ inputRowsPerSecond = 10.1,
+ processedRowsPerSecond = 10.1
+ )
+ ),
+ sink = SinkProgress("sink", None),
+ observedMetrics = new java.util.HashMap(Map().asJava)
+ )
+
+ val testProgress6 = new StreamingQueryProgress(
+ id = UUID.randomUUID,
+ runId = UUID.randomUUID,
+ name = "myName",
+ timestamp = "2025-09-19T00:00:00.111Z",
+ batchId = 97L,
+ batchDuration = 12L,
+ durationMs = null,
+ eventTime = null,
+ stateOperators = Array(),
+ sources = Array(new SourceProgress(
+ description = "kafka",
+ startOffset = "1000",
+ endOffset = "2000",
+ latestOffset = "3000",
+ numInputRows = 1001
+ // inputRowsPerSecond and processedRowsPerSecond should be Double.NaN
+ // and not present in the json
+ )
+ ),
+ sink = SinkProgress("sink", None),
+ observedMetrics = new java.util.HashMap(Map().asJava)
+ )
+
+ val testProgress7 = new StreamingQueryProgress(
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = "KafkaMetricsTest",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]