This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a830dd251979 [SPARK-54128][CONNECT][SQL] Convert
IllegalArgumentException to SparkException with proper error classes in Spark
Connect server
a830dd251979 is described below
commit a830dd251979513308b5bf3a99e086588ebfacc3
Author: Martin Grund <[email protected]>
AuthorDate: Sun Nov 2 20:29:22 2025 -0800
[SPARK-54128][CONNECT][SQL] Convert IllegalArgumentException to
SparkException with proper error classes in Spark Connect server
### What changes were proposed in this pull request?
This PR converts several IllegalArgumentException and
UnsupportedOperationException instances in the Spark Connect server to proper
SparkException with structured error classes, ensuring consistent error
propagation to clients.
Changes include:
1. Added error class definitions in error-conditions.json:
- INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG
- INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID
- INVALID_PARAMETER_VALUE.STREAMING_LISTENER_COMMAND_MISSING
- INVALID_ARTIFACT_PATH
- UNSUPPORTED_FEATURE.INTERRUPT_TYPE
2. Updated service handlers:
- SparkConnectInterruptHandler: Converted generic exceptions to
SparkSQLException
- SparkConnectAddArtifactsHandler: Converted to SparkRuntimeException
for invalid paths
- SparkConnectStreamingQueryListenerHandler: Converted to
SparkSQLException
3. Added test coverage in SparkConnectServiceE2ESuite
### Why are the changes needed?
Previously, the Spark Connect server threw generic Java exceptions that:
- Did not include structured error classes
- Could not be properly categorized by clients
- Provided less actionable error information
- Were inconsistent with Spark's error handling standards
The error handling infrastructure (ErrorUtils.handleError) can only
propagate error classes from SparkThrowable instances. Generic Java exceptions
are converted to generic UNKNOWN errors, losing important context.
### Does this PR introduce _any_ user-facing change?
Yes, but only to improve error messages.
Before:
java.lang.IllegalArgumentException: INTERRUPT_TYPE_TAG requested, but no
operation_tag provided.
After:
[INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG] The value of
parameter(s)
operation_tag in interrupt is invalid: INTERRUPT_TYPE_TAG requested, but
no
operation_tag provided.
Clients can now parse structured error classes for better error handling
and recovery.
### How was this patch tested?
1. Added new test cases in SparkConnectServiceE2ESuite:
- test("Interrupt with TAG type without operation_tag throws proper
error class")
- test("Interrupt with OPERATION_ID type without operation_id throws
proper error class")
2. Manual verification of error class propagation through the gRPC layer
3. All modified files pass Scala style checks
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.5
Closes #52818 from grundprinzip/SPARK-error-messages.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 26 ++++++++++++
...SparkConnectStreamingQueryListenerHandler.scala | 6 ++-
.../service/SparkConnectAddArtifactsHandler.scala | 6 +--
.../service/SparkConnectInterruptHandler.scala | 16 +++++---
.../connect/service/AddArtifactsHandlerSuite.scala | 12 +-----
.../service/SparkConnectServiceE2ESuite.scala | 48 ++++++++++++++++++++++
6 files changed, 95 insertions(+), 19 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 34b72975cc07..c19e192d80ee 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2478,6 +2478,12 @@
],
"sqlState" : "22003"
},
+ "INVALID_ARTIFACT_PATH" : {
+ "message" : [
+ "Artifact with name <name> is invalid. The name must be a relative path
and cannot reference parent/sibling/nephew directories."
+ ],
+ "sqlState" : "22023"
+ },
"INVALID_ATTRIBUTE_NAME_SYNTAX" : {
"message" : [
"Syntax error in the attribute name: <name>. Check that backticks appear
in pairs, a quoted string is a complete name part and use a backtick only
inside quoted name parts."
@@ -3466,6 +3472,16 @@
"expects an integer literal, but got <invalidValue>."
]
},
+ "INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID" : {
+ "message" : [
+ "INTERRUPT_TYPE_OPERATION_ID requested, but no operation_id
provided."
+ ]
+ },
+ "INTERRUPT_TYPE_TAG_REQUIRES_TAG" : {
+ "message" : [
+ "INTERRUPT_TYPE_TAG requested, but no operation_tag provided."
+ ]
+ },
"LENGTH" : {
"message" : [
"Expects `length` greater than or equal to 0, but got <length>."
@@ -3496,6 +3512,11 @@
"Expects a positive or a negative value for `start`, but got 0."
]
},
+ "STREAMING_LISTENER_COMMAND_MISSING" : {
+ "message" : [
+ "Missing command in StreamingQueryListenerBusCommand."
+ ]
+ },
"STRING" : {
"message" : [
"expects a string literal, but got <invalidValue>."
@@ -6352,6 +6373,11 @@
"INSERT INTO <tableName> with IF NOT EXISTS in the PARTITION spec."
]
},
+ "INTERRUPT_TYPE" : {
+ "message" : [
+ "Unsupported interrupt type: <interruptType>."
+ ]
+ },
"LAMBDA_FUNCTION_WITH_PYTHON_UDF" : {
"message" : [
"Lambda function with Python UDF <funcName> in a higher order
function."
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
index 04312a35a3b4..7bee4539d01b 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
@@ -21,6 +21,7 @@ import scala.util.control.NonFatal
import io.grpc.stub.StreamObserver
+import org.apache.spark.SparkSQLException
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
@@ -117,7 +118,10 @@ class
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
- throw new IllegalArgumentException("Missing command in
StreamingQueryListenerBusCommand")
+ throw new SparkSQLException(
+ errorClass =
"INVALID_PARAMETER_VALUE.STREAMING_LISTENER_COMMAND_MISSING",
+ messageParameters =
+ Map("parameter" -> "command", "functionName" ->
"StreamingQueryListenerBusCommand"))
}
executeHolder.eventsManager.postFinished()
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
index becd7d855133..477d5b974fac 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
@@ -219,9 +219,9 @@ class SparkConnectAddArtifactsHandler(val responseObserver:
StreamObserver[AddAr
ArtifactUtils.concatenatePaths(stagingDir, path)
} catch {
case _: IllegalArgumentException =>
- throw new IllegalArgumentException(
- s"Artifact with name: $name is invalid. The `name` " +
- s"must be a relative path and cannot reference
parent/sibling/nephew directories.")
+ throw new SparkRuntimeException(
+ errorClass = "INVALID_ARTIFACT_PATH",
+ messageParameters = Map("name" -> name))
case NonFatal(e) => throw e
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
index ae38e55d3c67..8f41257ccdfd 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
@@ -21,6 +21,7 @@ import scala.jdk.CollectionConverters._
import io.grpc.stub.StreamObserver
+import org.apache.spark.SparkSQLException
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
@@ -41,18 +42,23 @@ class SparkConnectInterruptHandler(responseObserver:
StreamObserver[proto.Interr
sessionHolder.interruptAll()
case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG =>
if (!v.hasOperationTag) {
- throw new IllegalArgumentException(
- s"INTERRUPT_TYPE_TAG requested, but no operation_tag provided.")
+ throw new SparkSQLException(
+ errorClass =
"INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG",
+ messageParameters =
+ Map("parameter" -> "operation_tag", "functionName" ->
"interrupt"))
}
sessionHolder.interruptTag(v.getOperationTag)
case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID =>
if (!v.hasOperationId) {
- throw new IllegalArgumentException(
- s"INTERRUPT_TYPE_OPERATION_ID requested, but no operation_id
provided.")
+ throw new SparkSQLException(
+ errorClass =
"INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID",
+ messageParameters = Map("parameter" -> "operation_id",
"functionName" -> "interrupt"))
}
sessionHolder.interruptOperation(v.getOperationId)
case other =>
- throw new UnsupportedOperationException(s"Unknown InterruptType
$other!")
+ throw new SparkSQLException(
+ errorClass = "UNSUPPORTED_FEATURE.INTERRUPT_TYPE",
+ messageParameters = Map("interruptType" -> other.toString))
}
val response = proto.InterruptResponse
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
index 6cc5daadfddd..1df8ba46286c 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
@@ -399,11 +399,7 @@ class AddArtifactsHandlerSuite extends SharedSparkSession
with ResourceHelper {
handler.onNext(req)
}
assert(e.getStatus.getCode == Code.INTERNAL)
- val statusProto = StatusProto.fromThrowable(e)
- assert(statusProto.getDetailsCount == 1)
- val details = statusProto.getDetails(0)
- val info = details.unpack(classOf[ErrorInfo])
- assert(info.getReason.contains("java.lang.IllegalArgumentException"))
+ assert(e.getMessage.contains("INVALID_ARTIFACT_PATH"))
}
handler.onCompleted()
} finally {
@@ -422,11 +418,7 @@ class AddArtifactsHandlerSuite extends SharedSparkSession
with ResourceHelper {
handler.onNext(req)
}
assert(e.getStatus.getCode == Code.INTERNAL)
- val statusProto = StatusProto.fromThrowable(e)
- assert(statusProto.getDetailsCount == 1)
- val details = statusProto.getDetails(0)
- val info = details.unpack(classOf[ErrorInfo])
- assert(info.getReason.contains("java.lang.IllegalArgumentException"))
+ assert(e.getMessage.contains("INVALID_ARTIFACT_PATH"))
}
handler.onCompleted()
} finally {
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index 0e18ff711c4c..6eee71db5709 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -317,4 +317,52 @@ class SparkConnectServiceE2ESuite extends
SparkConnectServerTest {
assert(error.getMessage.contains(fixedOperationId))
}
}
+
+ test("Interrupt with TAG type without operation_tag throws proper error
class") {
+ withRawBlockingStub { stub =>
+ // Create an interrupt request with INTERRUPT_TYPE_TAG but no
operation_tag
+ val request = org.apache.spark.connect.proto.InterruptRequest
+ .newBuilder()
+ .setSessionId(UUID.randomUUID().toString)
+ .setUserContext(org.apache.spark.connect.proto.UserContext
+ .newBuilder()
+ .setUserId(defaultUserId))
+ .setInterruptType(
+
org.apache.spark.connect.proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG)
+ .build()
+
+ val error = intercept[io.grpc.StatusRuntimeException] {
+ stub.interrupt(request)
+ }
+
+ // Verify the error is
INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG
+
assert(error.getMessage.contains("INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG"))
+ assert(error.getMessage.contains("operation_tag"))
+ }
+ }
+
+ test("Interrupt with OPERATION_ID type without operation_id throws proper
error class") {
+ withRawBlockingStub { stub =>
+ // Create an interrupt request with INTERRUPT_TYPE_OPERATION_ID but no
operation_id
+ val request = org.apache.spark.connect.proto.InterruptRequest
+ .newBuilder()
+ .setSessionId(UUID.randomUUID().toString)
+ .setUserContext(org.apache.spark.connect.proto.UserContext
+ .newBuilder()
+ .setUserId(defaultUserId))
+ .setInterruptType(
+
org.apache.spark.connect.proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID)
+ .build()
+
+ val error = intercept[io.grpc.StatusRuntimeException] {
+ stub.interrupt(request)
+ }
+
+ // Verify the error is
INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID
+ assert(
+ error.getMessage.contains(
+ "INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID"))
+ assert(error.getMessage.contains("operation_id"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]