This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2aa172dba11 [SPARK-45822][CONNECT] SparkConnectSessionManager may look
up a stopped sparkcontext
2aa172dba11 is described below
commit 2aa172dba1176de76719021a45a017759379abe5
Author: Kent Yao <[email protected]>
AuthorDate: Tue Nov 7 23:32:47 2023 +0800
[SPARK-45822][CONNECT] SparkConnectSessionManager may look up a stopped
sparkcontext
### What changes were proposed in this pull request?
This PR checks whether the sc is still functional before cloning a new
isolated session from it.
### Why are the changes needed?
SparkSession.active is a thread-local value and not be updated by other
thread.
This causes
https://github.com/LuciferYang/spark/actions/runs/6767960232/job/18426049162
```java
- ReleaseSession: session with different session_id or user_id allowed
after release *** FAILED *** (9 milliseconds)
[info] org.apache.spark.SparkException:
com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
[info] This stopped SparkContext was created at:
[info]
[info]
org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite.beforeAll(SparkConnectSessionHolderSuite.scala:37)
[info]
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
```
For shared spark sessions in tests, these sessions are created, stopped,
and retrieved in different threads.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
I ran `build/sbt "connect/testOnly *SparkConnect*"` locally and the test
consistently failed w/o this patch. Otherwise, it passed.
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43701 from yaooqinn/SPARK-45822.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../spark/sql/connect/service/SparkConnectSessionManager.scala | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
index 5c8e3c61158..ba402a90a71 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
@@ -139,7 +139,13 @@ class SparkConnectSessionManager extends Logging {
}
private def newIsolatedSession(): SparkSession = {
- SparkSession.active.newSession()
+ val active = SparkSession.active
+ if (active.sparkContext.isStopped) {
+ assert(SparkSession.getDefaultSession.nonEmpty)
+ SparkSession.getDefaultSession.get.newSession()
+ } else {
+ active.newSession()
+ }
}
private def validateSessionCreate(key: SessionKey): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]