This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ff26b602c92b [SPARK-45798][CONNECT] Followup: add serverSessionId to
SessionHolderInfo
ff26b602c92b is described below
commit ff26b602c92b920867fadf90eb6157ea291e14b8
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Wed Dec 13 12:16:21 2023 -0800
[SPARK-45798][CONNECT] Followup: add serverSessionId to SessionHolderInfo
### What changes were proposed in this pull request?
Small followup to https://github.com/apache/spark/pull/43664 - add
serverSessionId to SessionHolderInfo.
### Why are the changes needed?
SessionHolderInfo should contain this kind of information about the session.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
NA
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44334 from juliuszsompolski/SPARK-45798-fup.
Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/sql/connect/service/SessionHolder.scala | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 0fdf55ff42a0..f097f2db5889 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.connect.planner.PythonStreamingQueryListener
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper
import org.apache.spark.sql.connect.service.SessionHolder.{ERROR_CACHE_SIZE,
ERROR_CACHE_TIMEOUT_SEC}
import org.apache.spark.sql.streaming.StreamingQueryListener
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{SystemClock, Utils}
// Unique key identifying session by combination of user, and session id
case class SessionKey(userId: String, sessionId: String)
@@ -95,8 +95,14 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
// Returns the server side session ID and asserts that it must be different
from the client-side
// session ID.
def serverSessionId: String = {
- assert(session.sessionUUID != sessionId)
- session.sessionUUID
+ if (Utils.isTesting && session == null) {
+ // Testing-only: Some sessions created by SessionHolder.forTesting are
not fully initialized
+ // and don't have an underlying SparkSession.
+ ""
+ } else {
+ assert(session.sessionUUID != sessionId)
+ session.sessionUUID
+ }
}
/**
@@ -238,7 +244,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
closedTimeMs = Some(System.currentTimeMillis())
- if (eventManager.status == SessionStatus.Pending) {
+ if (Utils.isTesting && eventManager.status == SessionStatus.Pending) {
// Testing-only: Some sessions created by SessionHolder.forTesting are
not fully initialized
// and can't be closed.
return
@@ -288,6 +294,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
SessionHolderInfo(
userId = userId,
sessionId = sessionId,
+ serverSessionId = serverSessionId,
status = eventManager.status,
startTimeMs = startTimeMs,
lastAccessTimeMs = lastAccessTimeMs,
@@ -390,6 +397,7 @@ object SessionHolder {
case class SessionHolderInfo(
userId: String,
sessionId: String,
+ serverSessionId: String,
status: SessionStatus,
customInactiveTimeoutMs: Option[Long],
startTimeMs: Long,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]