This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 18c578d4f7b [SPARK-43206][SS][CONNECT] StreamingQuery exception() 
include stack trace
18c578d4f7b is described below

commit 18c578d4f7be1a0f542b48c0741daf04aba8f054
Author: Wei Liu <[email protected]>
AuthorDate: Mon May 1 14:06:51 2023 -0400

    [SPARK-43206][SS][CONNECT] StreamingQuery exception() include stack trace
    
    ### What changes were proposed in this pull request?
    
    Add stack trace to streamingQuery's `exception()` method. Following 
https://github.com/apache/spark/commit/a5c8a3c976889f33595ac18f82e73e6b9fd29b57#diff-98baf452f0352c75a39f39351c5f9e656675810b6d4cfd178f1b0bae9751495b
    
    Add to both python client and scala client
    
    ### Why are the changes needed?
    
    Including stack trace is helpful in debugging
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manual test:
    Python:
    ```
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.5.0.dev0
          /_/
    
    Using Python version 3.9.16 (main, Dec  7 2022 01:11:58)
    Client connected to the Spark Connect server at localhost
    SparkSession available as 'spark'.
    >>> from pyspark.sql.functions import col, udf
    >>> from pyspark.errors import StreamingQueryException
    >>> sdf = 
spark.readStream.format("text").load("python/test_support/sql/streaming")
    >>> bad_udf = udf(lambda x: 1 / 0)
    >>> sq = 
sdf.select(bad_udf(col("value"))).writeStream.format("memory").queryName("this_query").start()
    >>> sq.exception()
    StreamingQueryException('Job aborted due to stage failure: Task 0 in stage 
0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) 
(ip-10-110-19-234.us-west-2.compute.internal executor driver): 
org.apache.spark.api.python.PythonException: Traceback (most recent call 
last):\n  File "<stdin>", line 1, in <lambda>\nZeroDivisionError: division by 
zero\n\n\tat 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)\n\tat
 [...]
    ```
    2. Scala:
    ```
    Spark session available as 'spark'.
       _____                  __      ______                            __
      / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
      \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
     ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
    /____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
        /_/
    
     import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    
     val sdf = 
spark.readStream.format("text").load("python/test_support/sql/streaming")
    sdf: org.apache.spark.sql.package.DataFrame = [value: string]
    
     val badUdf = udf((x: String) => 1 / 0)
    badUdf: org.apache.spark.sql.expressions.UserDefinedFunction = 
ScalarUserDefinedFunction(
      ammonite.$sess.cmd2$Helper$$Lambda$1913/745186412239d9cb7,
      ArrayBuffer(StringEncoder),
      PrimitiveIntEncoder,
      None,
      true,
      true
    )
    
     val sq = 
sdf.select(badUdf(col("value"))).writeStream.format("memory").queryName("this_query").start()
    sq: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.streaming.RemoteStreamingQuery13866865
    
     sq.isActive
    res4: Boolean = false
    
     sq.exception.get.toString
    res5: String = """org.apache.spark.sql.streaming.StreamingQueryException: 
Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 2.0 (TID 2) 
(ip-10-110-19-234.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in 
instance of org.apache.spark.sql.catalyst.expre [...]
            at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
            at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
            at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411)
            at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
            at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
            at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
            at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
            at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
            at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
            at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
            at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
            at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
            at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.jav...
    JVM stacktrace: org.apache.spark.sql.streaming.StreamingQueryException: Job 
aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 2.0 (TID 2) 
(ip-10-110-19-234.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in 
instance of org.apache.spark.sql.catalyst.express [...]
            at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
            at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    ...
    ```
    
    Closes #40966 from WweiL/SPARK-43206-exception-stk-trace-new-2.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../spark/sql/streaming/StreamingQuery.scala       |  4 ++--
 .../sql/streaming/StreamingQueryException.scala    | 12 ++++++++++--
 .../src/main/protobuf/spark/connect/commands.proto |  3 ++-
 .../sql/connect/planner/SparkConnectPlanner.scala  | 19 +++++++++++++++----
 .../sql/connect/service/SparkConnectService.scala  |  4 +---
 python/pyspark/sql/connect/proto/commands_pb2.py   | 22 +++++++++++-----------
 python/pyspark/sql/connect/proto/commands_pb2.pyi  | 20 +++++++++++++++++---
 python/pyspark/sql/connect/streaming/query.py      | 10 ++++++----
 .../sql/streaming/StreamingQueryManager.scala      |  4 ++--
 9 files changed, 66 insertions(+), 32 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 7f6e3721841..69cefbd442e 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -243,12 +243,12 @@ class RemoteStreamingQuery(
   override def exception: Option[StreamingQueryException] = {
     val exception = executeQueryCmd(_.setException(true)).getException
     if (exception.hasExceptionMessage) {
-      // TODO(SPARK-43206): Add more information to StreamingQueryException.
       Some(
         new StreamingQueryException(
           // message maps to the return value of original 
StreamingQueryException's toString method
           message = exception.getExceptionMessage,
-          errorClass = exception.getErrorClass))
+          errorClass = exception.getErrorClass,
+          stackTrace = exception.getStackTrace))
     } else {
       None
     }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 875c216a3e7..d5e9982dfbf 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -27,12 +27,20 @@ import org.apache.spark.annotation.Evolving
  *   Maps to return value of original StreamingQueryException's toString method
  * @param errorClass
  *   Error class of this exception
+ * @param stackTrace
+ *   Stack trace of this exception
  * @since 3.5.0
  */
 @Evolving
-class StreamingQueryException private[sql] (message: String, errorClass: 
String)
+class StreamingQueryException private[sql] (
+    message: String,
+    errorClass: String,
+    stackTrace: String)
     extends SparkThrowable {
 
-  // TODO(SPARK-43206): Add stack trace
   override def getErrorClass: String = errorClass
+
+  override def toString: String = s"""$message
+    |JVM stacktrace: $stackTrace
+    |""".stripMargin
 }
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 0d6c29da9f8..3c977a02503 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -313,7 +313,8 @@ message StreamingQueryCommandResult {
     optional string exception_message = 1;
     // (Optional) Exception error class as string
     optional string error_class = 2;
-    // TODO(SPARK-43206): Add stack trace
+    // (Optional) Exception stack trace as string
+    optional string stack_trace = 3;
   }
 
   message AwaitTerminationResult {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3b4dd13cc5d..22229ba98b2 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
 import com.google.common.collect.{Lists, Maps}
 import com.google.protobuf.{Any => ProtoAny, ByteString}
 import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.exception.ExceptionUtils
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
@@ -2348,10 +2349,20 @@ class SparkConnectPlanner(val session: SparkSession) {
 
       case StreamingQueryCommand.CommandCase.EXCEPTION =>
         val result = query.exception
-        result.foreach(e =>
-          respBuilder.getExceptionBuilder
-            
.setExceptionMessage(SparkConnectService.abbreviateErrorMessage(e.toString))
-            .setErrorClass(e.getErrorClass))
+        if (result.isDefined) {
+          val e = result.get
+          val exception_builder = StreamingQueryCommandResult.ExceptionResult
+            .newBuilder()
+          exception_builder
+            .setExceptionMessage(e.toString)
+            .setErrorClass(e.getErrorClass)
+
+          val stackTrace = Option(ExceptionUtils.getStackTrace(e))
+          stackTrace.foreach { s =>
+            exception_builder.setStackTrace(s)
+          }
+          respBuilder.setException(exception_builder.build())
+        }
 
       case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
         if (command.getAwaitTermination.hasTimeoutMs) {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index df3c1fd7b05..a051eef5e40 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -340,10 +340,8 @@ object SparkConnectService {
     }
   }
 
-  def abbreviateErrorMessage(msg: String): String = 
StringUtils.abbreviate(msg, 2048)
-
   def extractErrorMessage(st: Throwable): String = {
-    val message = abbreviateErrorMessage(st.getMessage)
+    val message = StringUtils.abbreviate(st.getMessage, 2048)
     if (message != null) {
       message
     } else {
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py 
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 73575fbed85..cb40a54e344 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_rel
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
 [...]
+    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
 [...]
 )
 
 
@@ -435,7 +435,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 4539
     _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 4615
     _STREAMINGQUERYCOMMANDRESULT._serialized_start = 4629
-    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5716
+    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5770
     _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 5212
     _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 5382
     _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 5384
@@ -443,13 +443,13 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 5458
     _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 5497
     _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5500
-    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5643
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
5645
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5701
-    _GETRESOURCESCOMMAND._serialized_start = 5718
-    _GETRESOURCESCOMMAND._serialized_end = 5739
-    _GETRESOURCESCOMMANDRESULT._serialized_start = 5742
-    _GETRESOURCESCOMMANDRESULT._serialized_end = 5954
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5858
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5954
+    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5697
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
5699
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5755
+    _GETRESOURCESCOMMAND._serialized_start = 5772
+    _GETRESOURCESCOMMAND._serialized_end = 5793
+    _GETRESOURCESCOMMANDRESULT._serialized_start = 5796
+    _GETRESOURCESCOMMANDRESULT._serialized_end = 6008
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5912
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 6008
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi 
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 81856352167..697ac14f16f 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -1088,19 +1088,21 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
 
         EXCEPTION_MESSAGE_FIELD_NUMBER: builtins.int
         ERROR_CLASS_FIELD_NUMBER: builtins.int
+        STACK_TRACE_FIELD_NUMBER: builtins.int
         exception_message: builtins.str
         """(Optional) Exception message as string, maps to the return value of 
original
         StreamingQueryException's toString method
         """
         error_class: builtins.str
-        """(Optional) Exception error class as string
-        TODO(SPARK-43206): Add stack trace
-        """
+        """(Optional) Exception error class as string"""
+        stack_trace: builtins.str
+        """(Optional) Exception stack trace as string"""
         def __init__(
             self,
             *,
             exception_message: builtins.str | None = ...,
             error_class: builtins.str | None = ...,
+            stack_trace: builtins.str | None = ...,
         ) -> None: ...
         def HasField(
             self,
@@ -1109,10 +1111,14 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
                 b"_error_class",
                 "_exception_message",
                 b"_exception_message",
+                "_stack_trace",
+                b"_stack_trace",
                 "error_class",
                 b"error_class",
                 "exception_message",
                 b"exception_message",
+                "stack_trace",
+                b"stack_trace",
             ],
         ) -> builtins.bool: ...
         def ClearField(
@@ -1122,10 +1128,14 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
                 b"_error_class",
                 "_exception_message",
                 b"_exception_message",
+                "_stack_trace",
+                b"_stack_trace",
                 "error_class",
                 b"error_class",
                 "exception_message",
                 b"exception_message",
+                "stack_trace",
+                b"stack_trace",
             ],
         ) -> None: ...
         @typing.overload
@@ -1137,6 +1147,10 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
             self,
             oneof_group: typing_extensions.Literal["_exception_message", 
b"_exception_message"],
         ) -> typing_extensions.Literal["exception_message"] | None: ...
+        @typing.overload
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_stack_trace", 
b"_stack_trace"]
+        ) -> typing_extensions.Literal["stack_trace"] | None: ...
 
     class AwaitTerminationResult(google.protobuf.message.Message):
         DESCRIPTOR: google.protobuf.descriptor.Descriptor
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index fc207243ff3..4cdef79a7d4 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -147,14 +147,16 @@ class StreamingQuery:
         cmd = pb2.StreamingQueryCommand()
         cmd.exception = True
         exception = self._execute_streaming_query_cmd(cmd).exception
-        if exception.HasField("exception_message"):
+        if not exception.HasField("exception_message"):
+            return None
+        else:
             # Drop the Java StreamingQueryException type info
             # exception_message maps to the return value of original
             # StreamingQueryException's toString method
             msg = exception.exception_message.split(": ", 1)[1]
-            return CapturedStreamingQueryException(msg)
-        else:
-            return None
+            if exception.HasField("stack_trace"):
+                msg += f"\n\nJVM stacktrace:\n{exception.stack_trace}"
+            return CapturedStreamingQueryException(msg, 
reason=exception.error_class)
 
     exception.__doc__ = PySparkStreamingQuery.exception.__doc__
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 20254dec3d8..23d05a0a210 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -408,7 +408,7 @@ class StreamingQueryManager private[sql] (
       query.streamingQuery.start()
     } catch {
       case e: Throwable =>
-        unregisterTerminatedStream(query)
+//        unregisterTerminatedStream(query)
         throw e
     }
     query
@@ -416,7 +416,7 @@ class StreamingQueryManager private[sql] (
 
   /** Notify (by the StreamingQuery) that the query has been terminated */
   private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): 
Unit = {
-    unregisterTerminatedStream(terminatedQuery)
+//    unregisterTerminatedStream(terminatedQuery)
     awaitTerminationLock.synchronized {
       if (lastTerminatedQueryException == null || 
terminatedQuery.exception.nonEmpty) {
         lastTerminatedQueryException = terminatedQuery.exception


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to