This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3df41f5f211e [SPARK-55275] SQL State Coverage: IllegalStateException
3df41f5f211e is described below

commit 3df41f5f211e495103a1b934872a25683637944f
Author: Garland Zhang <[email protected]>
AuthorDate: Tue Mar 3 09:30:52 2026 -0400

    [SPARK-55275] SQL State Coverage: IllegalStateException
    
    ### What changes were proposed in this pull request?
    Update IllegalStateException => SparkIllegalStateException in spark connect 
layer
    
    ### Why are the changes needed?
     This keeps the Spark Connect layer more traceable for errors
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Changes exception type to their Spark equivalent
    
    ### How was this patch tested?
    Updated testing
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes
    
    Closes #54056 from garlandz-db/SPARK-55275.
    
    Authored-by: Garland Zhang <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  78 ++++++++
 .../src/main/resources/error/error-states.json     |   6 +
 .../spark/sql/connect/IllegalStateErrors.scala     | 123 +++++++++++++
 .../execution/ExecuteResponseObserver.scala        |  12 +-
 .../connect/pipelines/PipelineEventSender.scala    |   4 +-
 .../planner/StreamingForeachBatchHelper.scala      |  11 +-
 .../planner/StreamingQueryListenerHelper.scala     |   8 +-
 .../sql/connect/service/ExecuteEventsManager.scala |  18 +-
 .../sql/connect/service/SessionEventsManager.scala |  10 +-
 .../spark/sql/connect/service/SessionHolder.scala  |   9 +-
 .../service/SparkConnectExecutionManager.scala     |   3 +-
 .../sql/connect/service/SparkConnectService.scala  |   4 +-
 .../sql/connect/IllegalStateErrorsSuite.scala      | 203 +++++++++++++++++++++
 .../pipelines/PipelineEventSenderSuite.scala       |   4 +-
 14 files changed, 456 insertions(+), 37 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 63f57f0315c0..1c1f0a9eedf5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5516,6 +5516,84 @@
     ],
     "sqlState" : "42601"
   },
+  "SPARK_CONNECT_ILLEGAL_STATE" : {
+    "message" : [
+      "Spark Connect encountered an illegal state condition."
+    ],
+    "subClass" : {
+      "DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS" : {
+        "message" : [
+          "Cursor out of bounds: <cursor> exceeds batch size <batchSize>."
+        ]
+      },
+      "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS" : {
+        "message" : [
+          "ExecuteHolder with opId=<operationId> already exists!"
+        ]
+      },
+      "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH" : {
+        "message" : [
+          "Pipeline execution for graph ID <graphId> already exists. Stop the 
existing execution before starting a new one."
+        ]
+      },
+      "EXECUTION_STATE_OPERATION_ORPHANED" : {
+        "message" : [
+          "Operation <key> has been orphaned."
+        ]
+      },
+      "SESSION_MANAGEMENT_SERVICE_NOT_STARTED" : {
+        "message" : [
+          "Attempting to stop the Spark Connect service that has not been 
started."
+        ]
+      },
+      "SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED" : {
+        "message" : [
+          "Session <key> is already closed."
+        ]
+      },
+      "STATE_CONSISTENCY_CLEANER_ALREADY_SET" : {
+        "message" : [
+          "Cleaner for query <queryKey> has already been set for session 
<key>."
+        ]
+      },
+      
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH"
 : {
+        "message" : [
+          "operationId: <operationId> with status <currentStatus> is not 
within statuses <validStatuses> for event <eventStatus>"
+        ]
+      },
+      
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED" : {
+        "message" : [
+          "sessionId: <sessionId> with status <sessionStatus> is not Started 
for event <eventStatus>"
+        ]
+      },
+      "STATE_CONSISTENCY_NO_BATCHES_AVAILABLE" : {
+        "message" : [
+          "No batches available. Invalid response: <response>."
+        ]
+      },
+      "STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID" : {
+        "message" : [
+          "sessionId: <sessionId> with status <fromState> is not within 
statuses <validStates> for event <toState>"
+        ]
+      },
+      "STREAMING_QUERY_UNEXPECTED_RETURN_VALUE" : {
+        "message" : [
+          "Unexpected return value <value> from Python worker in <context> for 
session <key>."
+        ]
+      },
+      "STREAM_LIFECYCLE_ALREADY_COMPLETED" : {
+        "message" : [
+          "Stream <operation> can't be called after stream completed"
+        ]
+      },
+      "STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN" : {
+        "message" : [
+          "Cannot send event after shutdown for session <key>."
+        ]
+      }
+    },
+    "sqlState" : "XXSC0"
+  },
   "SPARK_JOB_CANCELLED" : {
     "message" : [
       "Job <jobId> cancelled <reason>"
diff --git a/common/utils/src/main/resources/error/error-states.json 
b/common/utils/src/main/resources/error/error-states.json
index 7b3050bd2266..c2b2bb2ed463 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7530,6 +7530,12 @@
         "standard": "N",
         "usedBy": ["Spark"]
     },
+    "XXSC0": {
+        "description": "Connect Server - Illegal State",
+        "origin": "Spark",
+        "standard": "N",
+        "usedBy": ["Spark"]
+    },
     "XXKD0": {
         "description": "Analysis - Bad plan",
         "origin": "Databricks",
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
new file mode 100644
index 000000000000..475b703c8292
--- /dev/null
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.spark.SparkIllegalStateException
+import org.apache.spark.sql.connect.service.{ExecuteStatus, SessionStatus}
+
+object IllegalStateErrors {
+
+  def streamLifecycleAlreadyCompleted(operation: String): 
SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.STREAM_LIFECYCLE_ALREADY_COMPLETED",
+      messageParameters = Map("operation" -> operation))
+
+  def cursorOutOfBounds(cursor: Long, batchSize: Long): 
SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS",
+      messageParameters = Map("cursor" -> cursor.toString, "batchSize" -> 
batchSize.toString))
+
+  def executionStateTransitionInvalidOperationStatus(
+      operationId: String,
+      currentStatus: ExecuteStatus,
+      validStatuses: List[ExecuteStatus],
+      eventStatus: ExecuteStatus): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+        
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH",
+      messageParameters = Map(
+        "operationId" -> operationId,
+        "currentStatus" -> currentStatus.toString,
+        "validStatuses" -> validStatuses.map(_.toString).mkString(", "),
+        "eventStatus" -> eventStatus.toString))
+
+  def executionStateTransitionInvalidSessionNotStarted(
+      sessionId: String,
+      sessionStatus: SessionStatus,
+      eventStatus: ExecuteStatus): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+        
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED",
+      messageParameters = Map(
+        "sessionId" -> sessionId,
+        "sessionStatus" -> sessionStatus.toString,
+        "eventStatus" -> eventStatus.toString))
+
+  def executeHolderAlreadyExists(operationId: String): 
SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS",
+      messageParameters = Map("operationId" -> operationId))
+
+  def executeHolderAlreadyExistsGraphId(graphId: String): 
SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+        "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH",
+      messageParameters = Map("graphId" -> graphId))
+
+  def sessionAlreadyClosed(sessionKey: String): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED",
+      messageParameters = Map("key" -> sessionKey))
+
+  def operationOrphaned(executeKey: String): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.EXECUTION_STATE_OPERATION_ORPHANED",
+      messageParameters = Map("key" -> executeKey))
+
+  def sessionStateTransitionInvalid(
+      sessionId: String,
+      fromState: SessionStatus,
+      toState: SessionStatus,
+      validStates: List[SessionStatus]): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass =
+        
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID",
+      messageParameters = Map(
+        "sessionId" -> sessionId,
+        "fromState" -> fromState.toString,
+        "toState" -> toState.toString,
+        "validStates" -> validStates.map(_.toString).mkString(", ")))
+
+  def serviceNotStarted(): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SERVICE_NOT_STARTED",
+      messageParameters = Map.empty)
+
+  def streamingQueryUnexpectedReturnValue(
+      key: String,
+      value: Int,
+      context: String): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.STREAMING_QUERY_UNEXPECTED_RETURN_VALUE",
+      messageParameters = Map("key" -> key, "value" -> value.toString, 
"context" -> context))
+
+  def cleanerAlreadySet(key: String, queryKey: String): 
SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_CLEANER_ALREADY_SET",
+      messageParameters = Map("key" -> key, "queryKey" -> queryKey))
+
+  def eventSendAfterShutdown(key: String): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN",
+      messageParameters = Map("key" -> key))
+
+  def noBatchesAvailable(response: String): SparkIllegalStateException =
+    new SparkIllegalStateException(
+      errorClass = 
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_NO_BATCHES_AVAILABLE",
+      messageParameters = Map("response" -> response))
+}
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 2473df0e53f1..9984b7c2ea5a 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys
+import org.apache.spark.sql.connect.IllegalStateErrors
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE
 import org.apache.spark.sql.connect.service.ExecuteHolder
 
@@ -133,7 +134,7 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
 
   def onNext(r: T): Unit = {
     if (!tryOnNext(r)) {
-      throw new IllegalStateException("Stream onNext can't be called after 
stream completed")
+      throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onNext")
     }
   }
 
@@ -142,14 +143,14 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
    */
   def onNextComplete(r: T): Unit = responseLock.synchronized {
     if (!tryOnNext(r)) {
-      throw new IllegalStateException("Stream onNext can't be called after 
stream completed")
+      throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onNext")
     }
     onCompleted()
   }
 
   def onError(t: Throwable): Unit = responseLock.synchronized {
     if (finalProducedIndex.nonEmpty) {
-      throw new IllegalStateException("Stream onError can't be called after 
stream completed")
+      throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onError")
     }
     error = Some(t)
     finalProducedIndex = Some(lastProducedIndex) // no responses to be send 
after error.
@@ -161,7 +162,7 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
 
   def onCompleted(): Unit = responseLock.synchronized {
     if (finalProducedIndex.nonEmpty) {
-      throw new IllegalStateException("Stream onCompleted can't be called 
after stream completed")
+      throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onCompleted")
     }
     finalProducedIndex = Some(lastProducedIndex)
     logDebug(
@@ -203,8 +204,7 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
         messageParameters = Map("index" -> index.toString, "responseId" -> 
responseId))
     } else if (getLastResponseIndex().exists(index > _)) {
       // If index > lastIndex, it's out of bounds. This is an internal error.
-      throw new IllegalStateException(
-        s"Cursor position $index is beyond last index 
${getLastResponseIndex()}.")
+      throw IllegalStateErrors.cursorOutOfBounds(index, 
getLastResponseIndex().get)
     }
     ret
   }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
index 5ea8c6f70312..79dd07b6a658 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
@@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.service.SessionHolder
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.common.FlowStatus
@@ -86,8 +87,7 @@ class PipelineEventSender(
         })
       }
     } else {
-      throw new IllegalStateException(
-        s"Cannot send event after shutdown for session 
${sessionHolder.sessionId}")
+      throw 
IllegalStateErrors.eventSendAfterShutdown(sessionHolder.key.toString)
     }
   }
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index 72662d2cb048..97da96894e2a 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, PYTHON_EXEC, QUERY_ID, 
RUN_ID_STRING, SESSION_ID, USER_ID}
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, 
AgnosticEncoders}
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.common.ForeachWriterPacket
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.service.SessionHolder
@@ -184,10 +185,10 @@ object StreamingForeachBatchHelper extends Logging {
               errorClass = "PYTHON_EXCEPTION",
               messageParameters = Map("msg" -> msg, "traceback" -> traceback))
           case otherValue =>
-            throw new IllegalStateException(
-              s"[session: ${sessionHolder.sessionId}] [userId: 
${sessionHolder.userId}] " +
-                s"Unexpected return value $otherValue from the " +
-                s"Python worker.")
+            throw IllegalStateErrors.streamingQueryUnexpectedReturnValue(
+              sessionHolder.key.toString,
+              otherValue,
+              "foreachBatch function")
         }
       } catch {
         // TODO: Better handling (e.g. retries) on exceptions like 
EOFException to avoid
@@ -233,7 +234,7 @@ object StreamingForeachBatchHelper extends Logging {
 
       Option(cleanerCache.putIfAbsent(key, cleaner)) match {
         case Some(_) =>
-          throw new IllegalStateException(s"Unexpected: a cleaner for query 
$key is already set")
+          throw 
IllegalStateErrors.cleanerAlreadySet(sessionHolder.key.toString, key.toString)
         case None => // Inserted. Normal.
       }
     }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
index faab81778482..1a48cd094cbd 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, 
SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.FUNCTION_NAME
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.service.{SessionHolder, 
SparkConnectService}
 import org.apache.spark.sql.streaming.StreamingQueryListener
@@ -98,9 +99,10 @@ class PythonStreamingQueryListener(listener: 
SimplePythonFunction, sessionHolder
             errorClass = "PYTHON_EXCEPTION",
             messageParameters = Map("msg" -> msg, "traceback" -> traceback))
         case otherValue =>
-          throw new IllegalStateException(
-            s"Unexpected return value $otherValue from the " +
-              s"Python worker.")
+          throw IllegalStateErrors.streamingQueryUnexpectedReturnValue(
+            sessionHolder.key.toString,
+            otherValue,
+            s"streaming query listener function $functionName")
       }
     } catch {
       case eof: EOFException =>
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 351be8875ba1..fcf01d5d29ab 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -24,6 +24,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.sql.catalyst.{QueryPlanningTracker, 
QueryPlanningTrackerCallback}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.common.ProtoUtils
 import org.apache.spark.util.{Clock, Utils}
 
@@ -350,16 +351,17 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
       validStatuses: List[ExecuteStatus],
       eventStatus: ExecuteStatus): Unit = {
     if (validStatuses.find(s => s == status).isEmpty) {
-      throw new IllegalStateException(s"""
-        operationId: $operationId with status ${status}
-        is not within statuses $validStatuses for event $eventStatus
-        """)
+      throw IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+        executeHolder.operationId,
+        status,
+        validStatuses,
+        eventStatus)
     }
     if (sessionHolder.eventManager.status != SessionStatus.Started) {
-      throw new IllegalStateException(s"""
-        sessionId: $sessionId with status $sessionStatus
-        is not Started for event $eventStatus
-        """)
+      throw 
IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+        sessionHolder.sessionId,
+        sessionStatus,
+        eventStatus)
     }
     _status = eventStatus
   }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
index 0a466594fad7..20abc2fbf335 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connect.service
 
 import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.util.{Clock}
 
 sealed abstract class SessionStatus(value: Int)
@@ -82,10 +83,11 @@ case class SessionEventsManager(sessionHolder: 
SessionHolder, clock: Clock) {
       validStatuses: List[SessionStatus],
       eventStatus: SessionStatus): Unit = {
     if (validStatuses.find(s => s == status).isEmpty) {
-      throw new IllegalStateException(s"""
-        sessionId: $sessionId with status ${status}
-        is not within statuses $validStatuses for event $eventStatus
-        """)
+      throw IllegalStateErrors.sessionStateTransitionInvalid(
+        sessionHolder.sessionId,
+        status,
+        eventStatus,
+        validStatuses)
     }
     _status = eventStatus
   }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 912543ac13dd..e5f9b7fe85f1 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -35,6 +35,7 @@ import org.apache.spark.internal.{Logging, LogKeys}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.classic.SparkSession
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.ml.MLCache
@@ -175,7 +176,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
 
     activeOperationIds.synchronized {
       if (activeOperationIds.contains(operationId)) {
-        throw new IllegalStateException(s"ExecuteHolder with 
opId=${operationId} already exists!")
+        throw IllegalStateErrors.executeHolderAlreadyExists(operationId)
       }
       activeOperationIds.add(operationId)
     }
@@ -368,7 +369,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
     // called only once, since removing the session from 
SparkConnectSessionManager.sessionStore is
     // synchronized and guaranteed to happen only once.
     if (closedTimeMs.isDefined) {
-      throw new IllegalStateException(s"Session $key is already closed.")
+      throw IllegalStateErrors.sessionAlreadyClosed(key.toString)
     }
     logInfo(
       log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
@@ -532,9 +533,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
       graphId,
       (_, existing) => {
         if (Option(existing).isDefined) {
-          throw new IllegalStateException(
-            s"Pipeline execution for graph ID $graphId already exists. " +
-              s"Stop the existing execution before starting a new one.")
+          throw IllegalStateErrors.executeHolderAlreadyExistsGraphId(graphId)
         }
 
         pipelineUpdateContext
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 768c6a858188..bb51438ce90f 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.{Logging, LogKeys}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
+import org.apache.spark.sql.connect.IllegalStateErrors
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
 CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, 
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
 import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
 import org.apache.spark.util.ThreadUtils
@@ -225,7 +226,7 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     } else if (executeHolder.isOrphan()) {
       logWarning(log"Reattach to an orphan operation.")
       removeExecuteHolder(executeHolder.key)
-      throw new IllegalStateException("Operation was orphaned because of an 
internal error.")
+      throw IllegalStateErrors.operationOrphaned(executeHolder.key.toString)
     }
 
     val responseSender =
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index c14c21bd6ccb..f2dd9c1b1cef 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -40,6 +40,7 @@ import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerEvent}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.classic.ClassicConversions._
+import org.apache.spark.sql.connect.IllegalStateErrors
 import org.apache.spark.sql.connect.config.Connect.{getAuthenticateToken, 
CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, 
CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE, 
CONNECT_GRPC_PORT_MAX_RETRIES}
 import org.apache.spark.sql.connect.execution.ConnectProgressExecutionListener
 import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore, 
SparkConnectServerListener, SparkConnectServerTab}
@@ -468,8 +469,7 @@ object SparkConnectService extends Logging {
     }
 
     if (!started) {
-      throw new IllegalStateException(
-        "Attempting to stop the Spark Connect service that has not been 
started.")
+      throw IllegalStateErrors.serviceNotStarted()
     }
 
     if (server != null) {
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
new file mode 100644
index 000000000000..2708a0a3fe2b
--- /dev/null
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connect.service.{ExecuteStatus, SessionStatus}
+
+class IllegalStateErrorsSuite extends SparkFunSuite {
+
+  test("streamLifecycleAlreadyCompleted should construct error correctly") {
+    val operation = "next"
+    val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(operation)
+    assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+    assert(error.getCondition.contains("STREAM_LIFECYCLE_ALREADY_COMPLETED"))
+    assert(error.getMessage.contains(operation))
+  }
+
+  test("cursorOutOfBounds should construct error correctly") {
+    val cursor = 15L
+    val batchSize = 10L
+    val error = IllegalStateErrors.cursorOutOfBounds(cursor, batchSize)
+    assert(error.getCondition.contains("DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS"))
+    assert(error.getMessage.contains(cursor.toString))
+    assert(error.getMessage.contains(batchSize.toString))
+  }
+
+  test("executionStateTransitionInvalidOperationStatus should construct error 
correctly") {
+    val operationId = "op-123"
+    val currentStatus = ExecuteStatus.Pending
+    val validStatuses = List(ExecuteStatus.Started, ExecuteStatus.Finished)
+    val eventStatus = ExecuteStatus.Analyzed
+    val error = 
IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+      operationId,
+      currentStatus,
+      validStatuses,
+      eventStatus)
+    val expectedCondition = "SPARK_CONNECT_ILLEGAL_STATE." +
+      
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH"
+    assert(error.getCondition.contains(expectedCondition))
+    assert(error.getMessage.contains(operationId))
+    assert(error.getMessage.contains(currentStatus.toString))
+  }
+
+  test("executionStateTransitionInvalidSessionNotStarted should construct 
error correctly") {
+    val sessionId = "session-456"
+    val sessionStatus = SessionStatus.Pending
+    val eventStatus = ExecuteStatus.Started
+    val error = 
IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+      sessionId,
+      sessionStatus,
+      eventStatus)
+    val expectedCondition = "SPARK_CONNECT_ILLEGAL_STATE." +
+      
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED"
+    assert(error.getCondition.contains(expectedCondition))
+    assert(error.getMessage.contains(sessionId))
+    assert(error.getMessage.contains(sessionStatus.toString))
+  }
+
+  test("executeHolderAlreadyExists should construct error correctly") {
+    val operationId = "op-789"
+    val error = IllegalStateErrors.executeHolderAlreadyExists(operationId)
+    
assert(error.getCondition.contains("EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS"))
+    assert(error.getMessage.contains(operationId))
+  }
+
+  test("executeHolderAlreadyExistsGraphId should construct error correctly") {
+    val graphId = "graph-123"
+    val error = IllegalStateErrors.executeHolderAlreadyExistsGraphId(graphId)
+    val expectedCondition =
+      "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH"
+    assert(error.getCondition.contains(expectedCondition))
+    assert(error.getMessage.contains(graphId))
+  }
+
+  test("sessionAlreadyClosed should construct error correctly") {
+    val sessionKey = "session-key-456"
+    val error = IllegalStateErrors.sessionAlreadyClosed(sessionKey)
+    
assert(error.getCondition.contains("SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED"))
+    assert(error.getMessage.contains(sessionKey))
+  }
+
+  test("operationOrphaned should construct error correctly") {
+    val executeKey = "execute-key-789"
+    val error = IllegalStateErrors.operationOrphaned(executeKey)
+    assert(error.getCondition.contains("EXECUTION_STATE_OPERATION_ORPHANED"))
+    assert(error.getMessage.contains(executeKey))
+  }
+
+  test("sessionStateTransitionInvalid should construct error correctly") {
+    val sessionId = "session-111"
+    val fromState = SessionStatus.Started
+    val toState = SessionStatus.Closed
+    val validStates = List(SessionStatus.Pending, SessionStatus.Started)
+    val error =
+      IllegalStateErrors.sessionStateTransitionInvalid(sessionId, fromState, 
toState, validStates)
+    val expectedCondition =
+      "STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID"
+    assert(error.getCondition.contains(expectedCondition))
+    assert(error.getMessage.contains(sessionId))
+    assert(error.getMessage.contains(fromState.toString))
+    assert(error.getMessage.contains(toState.toString))
+  }
+
+  test("serviceNotStarted should construct error correctly") {
+    val error = IllegalStateErrors.serviceNotStarted()
+    
assert(error.getCondition.contains("SESSION_MANAGEMENT_SERVICE_NOT_STARTED"))
+  }
+
+  test("streamingQueryUnexpectedReturnValue should construct error correctly") 
{
+    val key = "query-key-123"
+    val value = 42
+    val context = "test-context"
+    val error = IllegalStateErrors.streamingQueryUnexpectedReturnValue(key, 
value, context)
+    
assert(error.getCondition.contains("STREAMING_QUERY_UNEXPECTED_RETURN_VALUE"))
+    assert(error.getMessage.contains(key))
+    assert(error.getMessage.contains(value.toString))
+    assert(error.getMessage.contains(context))
+  }
+
+  test("cleanerAlreadySet should construct error correctly") {
+    val key = "session-key-222"
+    val queryKey = "query-key-333"
+    val error = IllegalStateErrors.cleanerAlreadySet(key, queryKey)
+    
assert(error.getCondition.contains("STATE_CONSISTENCY_CLEANER_ALREADY_SET"))
+    assert(error.getMessage.contains(key))
+    assert(error.getMessage.contains(queryKey))
+  }
+
+  test("eventSendAfterShutdown should construct error correctly") {
+    val key = "session-key-444"
+    val error = IllegalStateErrors.eventSendAfterShutdown(key)
+    
assert(error.getCondition.contains("STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN"))
+    assert(error.getMessage.contains(key))
+  }
+
+  test("noBatchesAvailable should construct error correctly") {
+    val response = "empty-response"
+    val error = IllegalStateErrors.noBatchesAvailable(response)
+    
assert(error.getCondition.contains("STATE_CONSISTENCY_NO_BATCHES_AVAILABLE"))
+    assert(error.getMessage.contains(response))
+  }
+
+  test("error messages should handle special characters correctly") {
+    val operation = "operation with spaces and special chars: <>&\""
+    val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(operation)
+    assert(error.getMessage.contains(operation))
+  }
+
+  test("error messages should handle empty strings") {
+    val emptyString = ""
+    val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(emptyString)
+    assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+  }
+
+  test("all errors should be SparkIllegalStateException instances") {
+    val errors = Seq(
+      IllegalStateErrors.streamLifecycleAlreadyCompleted("op"),
+      IllegalStateErrors.cursorOutOfBounds(1L, 2L),
+      IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+        "op",
+        ExecuteStatus.Pending,
+        List(ExecuteStatus.Started),
+        ExecuteStatus.Analyzed),
+      IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+        "session",
+        SessionStatus.Pending,
+        ExecuteStatus.Started),
+      IllegalStateErrors.executeHolderAlreadyExists("op"),
+      IllegalStateErrors.executeHolderAlreadyExistsGraphId("graph"),
+      IllegalStateErrors.sessionAlreadyClosed("key"),
+      IllegalStateErrors.operationOrphaned("key"),
+      IllegalStateErrors.sessionStateTransitionInvalid(
+        "session",
+        SessionStatus.Started,
+        SessionStatus.Closed,
+        List(SessionStatus.Pending)),
+      IllegalStateErrors.serviceNotStarted(),
+      IllegalStateErrors.streamingQueryUnexpectedReturnValue("key", 123, 
"context"),
+      IllegalStateErrors.cleanerAlreadySet("key", "queryKey"),
+      IllegalStateErrors.eventSendAfterShutdown("key"),
+      IllegalStateErrors.noBatchesAvailable("response"))
+
+    errors.foreach { error =>
+      assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+      assert(error.getCondition.contains("SPARK_CONNECT_ILLEGAL_STATE"))
+    }
+  }
+}
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
index fc3cce0f7459..d6942132b908 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
@@ -26,7 +26,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.sql.classic.{RuntimeConfig, SparkSession}
-import org.apache.spark.sql.connect.service.SessionHolder
+import org.apache.spark.sql.connect.service.{SessionHolder, SessionKey}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.common.FlowStatus
 import org.apache.spark.sql.pipelines.common.RunState.{COMPLETED, RUNNING}
@@ -39,7 +39,9 @@ class PipelineEventSenderSuite extends 
SparkDeclarativePipelinesServerTest with
     val mockObserver = mock[StreamObserver[ExecutePlanResponse]]
     val mockSessionHolder = mock[SessionHolder]
     when(mockSessionHolder.sessionId).thenReturn("test-session-id")
+    when(mockSessionHolder.userId).thenReturn("test-user-id")
     
when(mockSessionHolder.serverSessionId).thenReturn("test-server-session-id")
+    when(mockSessionHolder.key).thenReturn(SessionKey("test-user-id", 
"test-session-id"))
     val mockSession = mock[SparkSession]
     val mockConf = mock[RuntimeConfig]
     when(mockSessionHolder.session).thenReturn(mockSession)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to