This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e104300fe4ba [SPARK-50735][CONNECT] Failure in ExecuteResponseObserver
results in infinite reattaching requests
e104300fe4ba is described below
commit e104300fe4ba2a805a0fecf5a9cf76cf9294d1c5
Author: changgyoopark-db <[email protected]>
AuthorDate: Tue Jan 21 08:59:53 2025 +0900
[SPARK-50735][CONNECT] Failure in ExecuteResponseObserver results in
infinite reattaching requests
### What changes were proposed in this pull request?
The Spark Connect reattach request handler checks whether the associated
ExecuteThreadRunner is completed, and returns an error if it has failed to
record the outcome.
### Why are the changes needed?
ExecuteResponseObserver.{onError, onComplete} are fallible while they are
not retried; this leads to a situation where the ExecuteThreadRunner is
completed without succeeding in responding to the client, and thus the client
keeps retrying by reattaching the execution.
To be specific, if an ExecuteThreadRunner fails to record the completion of
execution or an error on the observer and then just disappears, the client will
endlessly reattach, hoping that "someone" will eventually record "some data,"
but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.
The fix is that when the client attaches, the handler checks the status of
the ExecuteThreadRunner, and if it finds that the execution cannot make any
progress, an error is returned.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49370 from changgyoopark-db/SPARK-50735.
Lead-authored-by: changgyoopark-db <[email protected]>
Co-authored-by: Changgyoo Park
<[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 07aa4ff08aad380e3e30e0724ff649f35a4770d7)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../execution/ExecuteResponseObserver.scala | 5 +++++
.../connect/execution/ExecuteThreadRunner.scala | 13 ++++++++++++-
.../spark/sql/connect/service/ExecuteHolder.scala | 20 ++++++++++++++++++++
.../SparkConnectReattachExecuteHandler.scala | 4 ++++
.../service/SparkConnectServiceE2ESuite.scala | 22 ++++++++++++++++++++++
5 files changed, 63 insertions(+), 1 deletion(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 9d83d93083dc..9d0cc2128dd4 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
finalProducedIndex.isDefined
}
+ // For testing.
+ private[connect] def undoCompletion(): Unit = responseLock.synchronized {
+ finalProducedIndex = None
+ }
+
/**
* Remove cached responses after response with lastReturnedIndex is returned
from getResponse.
* Remove according to caching policy:
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 05e3395a5316..13857e066a8f 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -63,6 +63,16 @@ private[connect] class ExecuteThreadRunner(executeHolder:
ExecuteHolder) extends
}
}
+ /**
+ * Checks if the thread is alive.
+ *
+ * @return
+ * true if the execution thread is currently running.
+ */
+ private[connect] def isAlive(): Boolean = {
+ executionThread.isAlive()
+ }
+
/**
* Interrupts the execution thread if the execution has been interrupted by
this method call.
*
@@ -288,7 +298,8 @@ private[connect] class ExecuteThreadRunner(executeHolder:
ExecuteHolder) extends
* True if we should delegate sending the final ResultComplete to the
handler thread, i.e.
* don't send a ResultComplete when the ExecuteThread returns.
*/
- private def shouldDelegateCompleteResponse(request:
proto.ExecutePlanRequest): Boolean = {
+ private[connect] def shouldDelegateCompleteResponse(
+ request: proto.ExecutePlanRequest): Boolean = {
request.getPlan.getOpTypeCase == proto.Plan.OpTypeCase.COMMAND &&
request.getPlan.getCommand.getCommandTypeCase ==
proto.Command.CommandTypeCase.STREAMING_QUERY_LISTENER_BUS_COMMAND &&
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index 1b8fa1b08473..9d8603d95c65 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -126,6 +126,16 @@ private[connect] class ExecuteHolder(
runner.start()
}
+ /**
+ * Check if the execution was ended without finalizing the outcome and no
further progress will
+ * be made. If the execution was delegated, this method always returns false.
+ */
+ def isOrphan(): Boolean = {
+ !runner.isAlive() &&
+ !runner.shouldDelegateCompleteResponse(request) &&
+ !responseObserver.completed()
+ }
+
def addObservation(name: String, observation: Observation): Unit =
synchronized {
observations += (name -> observation)
}
@@ -195,6 +205,16 @@ private[connect] class ExecuteHolder(
grpcResponseSenders.foreach(_.interrupt())
}
+ // For testing
+ private[connect] def undoResponseObserverCompletion() = synchronized {
+ responseObserver.undoCompletion()
+ }
+
+ // For testing
+ private[connect] def isExecuteThreadRunnerAlive() = {
+ runner.isAlive()
+ }
+
/**
* For a short period in ExecutePlan after creation and until
runGrpcResponseSender is called,
* there is no attached response sender, but yet we start with
lastAttachedRpcTime = None, so we
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index a2696311bd84..824fa54120b2 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -62,6 +62,10 @@ class SparkConnectReattachExecuteHandler(
throw new SparkSQLException(
errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
messageParameters = Map.empty)
+ } else if (executeHolder.isOrphan()) {
+ logWarning("Reattach to an orphan operation.")
+
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
+ throw new IllegalStateException("Operation was orphaned because of an
internal error.")
}
val responseSender =
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index f24560259a88..3337bb9b8ea4 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.connect.SparkConnectServerTest
+import org.apache.spark.sql.connect.config.Connect
class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
@@ -267,4 +268,25 @@ class SparkConnectServiceE2ESuite extends
SparkConnectServerTest {
iter2.hasNext
}
}
+
+ test("Exceptions thrown in the gRPC response observer does not lead to
infinite retries") {
+ withSparkEnvConfs(
+ (Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key,
"10")) {
+ withClient { client =>
+ val query = client.execute(buildPlan("SELECT 1"))
+ query.hasNext
+ val execution = eventuallyGetExecutionHolder
+ Eventually.eventually(timeout(eventuallyTimeout)) {
+ assert(!execution.isExecuteThreadRunnerAlive())
+ }
+
+ execution.undoResponseObserverCompletion()
+
+ val error = intercept[SparkException] {
+ while (query.hasNext) query.next()
+ }
+ assert(error.getMessage.contains("IllegalStateException"))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]