This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 1a3dce36e2a [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager
bug fix
1a3dce36e2a is described below
commit 1a3dce36e2ad8d0e0bb2e1123864764077320466
Author: Wei Liu <[email protected]>
AuthorDate: Fri Aug 11 10:23:56 2023 +0900
[SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix
When calling `spark.streams.get(q.id)` on a stopped query q. It should
return None in python and null in scala client. But right now it throws a null
pointer exception. This PR fixes this issue.
Bug fix
No
Added unit tests
Closes #42437 from WweiL/streaming-query-manager-get-bug-fix.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 42eb4223628653db71950f161a745432d1b45502)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/streaming/ClientStreamingQuerySuite.scala | 2 ++
.../spark/sql/connect/planner/SparkConnectPlanner.scala | 5 +++--
python/pyspark/sql/tests/streaming/test_streaming.py | 13 +++++++++++++
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index f9e6e686495..ab92431bc11 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -268,6 +268,8 @@ class ClientStreamingQuerySuite extends QueryTest with
SQLHelper with Logging {
q.stop()
assert(!q1.isActive)
+
+ assert(spark.streams.get(q.id) == null)
}
test("streaming query listener") {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d59d01b4ce3..49bac17a4f4 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3059,8 +3059,9 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
.asJava)
case StreamingQueryManagerCommand.CommandCase.GET_QUERY =>
- val query = session.streams.get(command.getGetQuery)
- respBuilder.setQuery(buildStreamingQueryInstance(query))
+ Option(session.streams.get(command.getGetQuery)).foreach { q =>
+ respBuilder.setQuery(buildStreamingQueryInstance(q))
+ }
case StreamingQueryManagerCommand.CommandCase.AWAIT_ANY_TERMINATION =>
if (command.getAwaitAnyTermination.hasTimeoutMs) {
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 52fa19a8642..0eea86dc737 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -315,6 +315,19 @@ class StreamingTestsMixin:
contains = msg in e.desc
self.assertTrue(contains, "Exception tree doesn't contain the expected
message: %s" % msg)
+ def test_query_manager_get(self):
+ df = self.spark.readStream.format("rate").load()
+ for q in self.spark.streams.active:
+ q.stop()
+ q = df.writeStream.format("noop").start()
+
+ self.assertTrue(q.isActive)
+ self.assertTrue(q.id == self.spark.streams.get(q.id).id)
+
+ q.stop()
+
+ self.assertIsNone(self.spark.streams.get(q.id))
+
def test_query_manager_await_termination(self):
df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
for q in self.spark.streams.active:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]