This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 18c578d4f7b [SPARK-43206][SS][CONNECT] StreamingQuery exception()
include stack trace
18c578d4f7b is described below
commit 18c578d4f7be1a0f542b48c0741daf04aba8f054
Author: Wei Liu <[email protected]>
AuthorDate: Mon May 1 14:06:51 2023 -0400
[SPARK-43206][SS][CONNECT] StreamingQuery exception() include stack trace
### What changes were proposed in this pull request?
Add stack trace to streamingQuery's `exception()` method. Following
https://github.com/apache/spark/commit/a5c8a3c976889f33595ac18f82e73e6b9fd29b57#diff-98baf452f0352c75a39f39351c5f9e656675810b6d4cfd178f1b0bae9751495b
Add to both python client and scala client
### Why are the changes needed?
Including stack trace is helpful in debugging
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test:
Python:
```
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.0.dev0
/_/
Using Python version 3.9.16 (main, Dec 7 2022 01:11:58)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> from pyspark.sql.functions import col, udf
>>> from pyspark.errors import StreamingQueryException
>>> sdf =
spark.readStream.format("text").load("python/test_support/sql/streaming")
>>> bad_udf = udf(lambda x: 1 / 0)
>>> sq =
sdf.select(bad_udf(col("value"))).writeStream.format("memory").queryName("this_query").start()
>>> sq.exception()
StreamingQueryException('Job aborted due to stage failure: Task 0 in stage
0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0)
(ip-10-110-19-234.us-west-2.compute.internal executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):\n File "<stdin>", line 1, in <lambda>\nZeroDivisionError: division by
zero\n\n\tat
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)\n\tat
[...]
```
2. Scala:
```
Spark session available as 'spark'.
_____ __ ______ __
/ ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_
\__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/
___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_
/____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/
/_/
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
val sdf =
spark.readStream.format("text").load("python/test_support/sql/streaming")
sdf: org.apache.spark.sql.package.DataFrame = [value: string]
val badUdf = udf((x: String) => 1 / 0)
badUdf: org.apache.spark.sql.expressions.UserDefinedFunction =
ScalarUserDefinedFunction(
ammonite.$sess.cmd2$Helper$$Lambda$1913/745186412239d9cb7,
ArrayBuffer(StringEncoder),
PrimitiveIntEncoder,
None,
true,
true
)
val sq =
sdf.select(badUdf(col("value"))).writeStream.format("memory").queryName("this_query").start()
sq: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.streaming.RemoteStreamingQuery13866865
sq.isActive
res4: Boolean = false
sq.exception.get.toString
res5: String = """org.apache.spark.sql.streaming.StreamingQueryException:
Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 2.0 (TID 2)
(ip-10-110-19-234.us-west-2.compute.internal executor driver):
java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in
instance of org.apache.spark.sql.catalyst.expre [...]
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.jav...
JVM stacktrace: org.apache.spark.sql.streaming.StreamingQueryException: Job
aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 2.0 (TID 2)
(ip-10-110-19-234.us-west-2.compute.internal executor driver):
java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in
instance of org.apache.spark.sql.catalyst.express [...]
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
...
```
Closes #40966 from WweiL/SPARK-43206-exception-stk-trace-new-2.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/streaming/StreamingQuery.scala | 4 ++--
.../sql/streaming/StreamingQueryException.scala | 12 ++++++++++--
.../src/main/protobuf/spark/connect/commands.proto | 3 ++-
.../sql/connect/planner/SparkConnectPlanner.scala | 19 +++++++++++++++----
.../sql/connect/service/SparkConnectService.scala | 4 +---
python/pyspark/sql/connect/proto/commands_pb2.py | 22 +++++++++++-----------
python/pyspark/sql/connect/proto/commands_pb2.pyi | 20 +++++++++++++++++---
python/pyspark/sql/connect/streaming/query.py | 10 ++++++----
.../sql/streaming/StreamingQueryManager.scala | 4 ++--
9 files changed, 66 insertions(+), 32 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 7f6e3721841..69cefbd442e 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -243,12 +243,12 @@ class RemoteStreamingQuery(
override def exception: Option[StreamingQueryException] = {
val exception = executeQueryCmd(_.setException(true)).getException
if (exception.hasExceptionMessage) {
- // TODO(SPARK-43206): Add more information to StreamingQueryException.
Some(
new StreamingQueryException(
// message maps to the return value of original
StreamingQueryException's toString method
message = exception.getExceptionMessage,
- errorClass = exception.getErrorClass))
+ errorClass = exception.getErrorClass,
+ stackTrace = exception.getStackTrace))
} else {
None
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 875c216a3e7..d5e9982dfbf 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -27,12 +27,20 @@ import org.apache.spark.annotation.Evolving
* Maps to return value of original StreamingQueryException's toString method
* @param errorClass
* Error class of this exception
+ * @param stackTrace
+ * Stack trace of this exception
* @since 3.5.0
*/
@Evolving
-class StreamingQueryException private[sql] (message: String, errorClass:
String)
+class StreamingQueryException private[sql] (
+ message: String,
+ errorClass: String,
+ stackTrace: String)
extends SparkThrowable {
- // TODO(SPARK-43206): Add stack trace
override def getErrorClass: String = errorClass
+
+ override def toString: String = s"""$message
+ |JVM stacktrace: $stackTrace
+ |""".stripMargin
}
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 0d6c29da9f8..3c977a02503 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -313,7 +313,8 @@ message StreamingQueryCommandResult {
optional string exception_message = 1;
// (Optional) Exception error class as string
optional string error_class = 2;
- // TODO(SPARK-43206): Add stack trace
+ // (Optional) Exception stack trace as string
+ optional string stack_trace = 3;
}
message AwaitTerminationResult {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3b4dd13cc5d..22229ba98b2 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
import com.google.common.collect.{Lists, Maps}
import com.google.protobuf.{Any => ProtoAny, ByteString}
import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
@@ -2348,10 +2349,20 @@ class SparkConnectPlanner(val session: SparkSession) {
case StreamingQueryCommand.CommandCase.EXCEPTION =>
val result = query.exception
- result.foreach(e =>
- respBuilder.getExceptionBuilder
-
.setExceptionMessage(SparkConnectService.abbreviateErrorMessage(e.toString))
- .setErrorClass(e.getErrorClass))
+ if (result.isDefined) {
+ val e = result.get
+ val exception_builder = StreamingQueryCommandResult.ExceptionResult
+ .newBuilder()
+ exception_builder
+ .setExceptionMessage(e.toString)
+ .setErrorClass(e.getErrorClass)
+
+ val stackTrace = Option(ExceptionUtils.getStackTrace(e))
+ stackTrace.foreach { s =>
+ exception_builder.setStackTrace(s)
+ }
+ respBuilder.setException(exception_builder.build())
+ }
case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
if (command.getAwaitTermination.hasTimeoutMs) {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index df3c1fd7b05..a051eef5e40 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -340,10 +340,8 @@ object SparkConnectService {
}
}
- def abbreviateErrorMessage(msg: String): String =
StringUtils.abbreviate(msg, 2048)
-
def extractErrorMessage(st: Throwable): String = {
- val message = abbreviateErrorMessage(st.getMessage)
+ val message = StringUtils.abbreviate(st.getMessage, 2048)
if (message != null) {
message
} else {
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 73575fbed85..cb40a54e344 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import relations_pb2 as
spark_dot_connect_dot_rel
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
[...]
+
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
[...]
)
@@ -435,7 +435,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 4539
_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 4615
_STREAMINGQUERYCOMMANDRESULT._serialized_start = 4629
- _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5716
+ _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5770
_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 5212
_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 5382
_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 5384
@@ -443,13 +443,13 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 5458
_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 5497
_STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5500
- _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5643
- _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start =
5645
- _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5701
- _GETRESOURCESCOMMAND._serialized_start = 5718
- _GETRESOURCESCOMMAND._serialized_end = 5739
- _GETRESOURCESCOMMANDRESULT._serialized_start = 5742
- _GETRESOURCESCOMMANDRESULT._serialized_end = 5954
- _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5858
- _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5954
+ _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5697
+ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start =
5699
+ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5755
+ _GETRESOURCESCOMMAND._serialized_start = 5772
+ _GETRESOURCESCOMMAND._serialized_end = 5793
+ _GETRESOURCESCOMMANDRESULT._serialized_start = 5796
+ _GETRESOURCESCOMMANDRESULT._serialized_end = 6008
+ _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5912
+ _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 6008
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 81856352167..697ac14f16f 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -1088,19 +1088,21 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
EXCEPTION_MESSAGE_FIELD_NUMBER: builtins.int
ERROR_CLASS_FIELD_NUMBER: builtins.int
+ STACK_TRACE_FIELD_NUMBER: builtins.int
exception_message: builtins.str
"""(Optional) Exception message as string, maps to the return value of
original
StreamingQueryException's toString method
"""
error_class: builtins.str
- """(Optional) Exception error class as string
- TODO(SPARK-43206): Add stack trace
- """
+ """(Optional) Exception error class as string"""
+ stack_trace: builtins.str
+ """(Optional) Exception stack trace as string"""
def __init__(
self,
*,
exception_message: builtins.str | None = ...,
error_class: builtins.str | None = ...,
+ stack_trace: builtins.str | None = ...,
) -> None: ...
def HasField(
self,
@@ -1109,10 +1111,14 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
b"_error_class",
"_exception_message",
b"_exception_message",
+ "_stack_trace",
+ b"_stack_trace",
"error_class",
b"error_class",
"exception_message",
b"exception_message",
+ "stack_trace",
+ b"stack_trace",
],
) -> builtins.bool: ...
def ClearField(
@@ -1122,10 +1128,14 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
b"_error_class",
"_exception_message",
b"_exception_message",
+ "_stack_trace",
+ b"_stack_trace",
"error_class",
b"error_class",
"exception_message",
b"exception_message",
+ "stack_trace",
+ b"stack_trace",
],
) -> None: ...
@typing.overload
@@ -1137,6 +1147,10 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
self,
oneof_group: typing_extensions.Literal["_exception_message",
b"_exception_message"],
) -> typing_extensions.Literal["exception_message"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_stack_trace",
b"_stack_trace"]
+ ) -> typing_extensions.Literal["stack_trace"] | None: ...
class AwaitTerminationResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
diff --git a/python/pyspark/sql/connect/streaming/query.py
b/python/pyspark/sql/connect/streaming/query.py
index fc207243ff3..4cdef79a7d4 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -147,14 +147,16 @@ class StreamingQuery:
cmd = pb2.StreamingQueryCommand()
cmd.exception = True
exception = self._execute_streaming_query_cmd(cmd).exception
- if exception.HasField("exception_message"):
+ if not exception.HasField("exception_message"):
+ return None
+ else:
# Drop the Java StreamingQueryException type info
# exception_message maps to the return value of original
# StreamingQueryException's toString method
msg = exception.exception_message.split(": ", 1)[1]
- return CapturedStreamingQueryException(msg)
- else:
- return None
+ if exception.HasField("stack_trace"):
+ msg += f"\n\nJVM stacktrace:\n{exception.stack_trace}"
+ return CapturedStreamingQueryException(msg,
reason=exception.error_class)
exception.__doc__ = PySparkStreamingQuery.exception.__doc__
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 20254dec3d8..23d05a0a210 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -408,7 +408,7 @@ class StreamingQueryManager private[sql] (
query.streamingQuery.start()
} catch {
case e: Throwable =>
- unregisterTerminatedStream(query)
+// unregisterTerminatedStream(query)
throw e
}
query
@@ -416,7 +416,7 @@ class StreamingQueryManager private[sql] (
/** Notify (by the StreamingQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery):
Unit = {
- unregisterTerminatedStream(terminatedQuery)
+// unregisterTerminatedStream(terminatedQuery)
awaitTerminationLock.synchronized {
if (lastTerminatedQueryException == null ||
terminatedQuery.exception.nonEmpty) {
lastTerminatedQueryException = terminatedQuery.exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]