This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e104300fe4ba [SPARK-50735][CONNECT] Failure in ExecuteResponseObserver 
results in infinite reattaching requests
e104300fe4ba is described below

commit e104300fe4ba2a805a0fecf5a9cf76cf9294d1c5
Author: changgyoopark-db <[email protected]>
AuthorDate: Tue Jan 21 08:59:53 2025 +0900

    [SPARK-50735][CONNECT] Failure in ExecuteResponseObserver results in 
infinite reattaching requests
    
    ### What changes were proposed in this pull request?
    
    The Spark Connect reattach request handler checks whether the associated 
ExecuteThreadRunner is completed, and returns an error if it has failed to 
record the outcome.
    
    ### Why are the changes needed?
    
    ExecuteResponseObserver.{onError, onComplete} are fallible while they are 
not retried; this leads to a situation where the ExecuteThreadRunner is 
completed without succeeding in responding to the client, and thus the client 
keeps retrying by reattaching the execution.
    
    To be specific, if an ExecuteThreadRunner fails to record the completion of 
execution or an error on the observer and then just disappears, the client will 
endlessly reattach, hoping that "someone" will eventually record "some data," 
but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.
    
    The fix is that when the client attaches, the handler checks the status of 
the ExecuteThreadRunner, and if it finds that the execution cannot make any 
progress, an error is returned.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49370 from changgyoopark-db/SPARK-50735.
    
    Lead-authored-by: changgyoopark-db <[email protected]>
    Co-authored-by: Changgyoo Park 
<[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 07aa4ff08aad380e3e30e0724ff649f35a4770d7)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../execution/ExecuteResponseObserver.scala        |  5 +++++
 .../connect/execution/ExecuteThreadRunner.scala    | 13 ++++++++++++-
 .../spark/sql/connect/service/ExecuteHolder.scala  | 20 ++++++++++++++++++++
 .../SparkConnectReattachExecuteHandler.scala       |  4 ++++
 .../service/SparkConnectServiceE2ESuite.scala      | 22 ++++++++++++++++++++++
 5 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 9d83d93083dc..9d0cc2128dd4 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
     finalProducedIndex.isDefined
   }
 
+  // For testing.
+  private[connect] def undoCompletion(): Unit = responseLock.synchronized {
+    finalProducedIndex = None
+  }
+
   /**
    * Remove cached responses after response with lastReturnedIndex is returned 
from getResponse.
    * Remove according to caching policy:
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 05e3395a5316..13857e066a8f 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -63,6 +63,16 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
     }
   }
 
+  /**
+   * Checks if the thread is alive.
+   *
+   * @return
+   *   true if the execution thread is currently running.
+   */
+  private[connect] def isAlive(): Boolean = {
+    executionThread.isAlive()
+  }
+
   /**
    * Interrupts the execution thread if the execution has been interrupted by 
this method call.
    *
@@ -288,7 +298,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
    *   True if we should delegate sending the final ResultComplete to the 
handler thread, i.e.
    *   don't send a ResultComplete when the ExecuteThread returns.
    */
-  private def shouldDelegateCompleteResponse(request: 
proto.ExecutePlanRequest): Boolean = {
+  private[connect] def shouldDelegateCompleteResponse(
+      request: proto.ExecutePlanRequest): Boolean = {
     request.getPlan.getOpTypeCase == proto.Plan.OpTypeCase.COMMAND &&
     request.getPlan.getCommand.getCommandTypeCase ==
       proto.Command.CommandTypeCase.STREAMING_QUERY_LISTENER_BUS_COMMAND &&
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index 1b8fa1b08473..9d8603d95c65 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -126,6 +126,16 @@ private[connect] class ExecuteHolder(
     runner.start()
   }
 
+  /**
+   * Check if the execution was ended without finalizing the outcome and no 
further progress will
+   * be made. If the execution was delegated, this method always returns false.
+   */
+  def isOrphan(): Boolean = {
+    !runner.isAlive() &&
+    !runner.shouldDelegateCompleteResponse(request) &&
+    !responseObserver.completed()
+  }
+
   def addObservation(name: String, observation: Observation): Unit = 
synchronized {
     observations += (name -> observation)
   }
@@ -195,6 +205,16 @@ private[connect] class ExecuteHolder(
     grpcResponseSenders.foreach(_.interrupt())
   }
 
+  // For testing
+  private[connect] def undoResponseObserverCompletion() = synchronized {
+    responseObserver.undoCompletion()
+  }
+
+  // For testing
+  private[connect] def isExecuteThreadRunnerAlive() = {
+    runner.isAlive()
+  }
+
   /**
    * For a short period in ExecutePlan after creation and until 
runGrpcResponseSender is called,
    * there is no attached response sender, but yet we start with 
lastAttachedRpcTime = None, so we
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index a2696311bd84..824fa54120b2 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -62,6 +62,10 @@ class SparkConnectReattachExecuteHandler(
       throw new SparkSQLException(
         errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
         messageParameters = Map.empty)
+    } else if (executeHolder.isOrphan()) {
+      logWarning("Reattach to an orphan operation.")
+      
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
+      throw new IllegalStateException("Operation was orphaned because of an 
internal error.")
     }
 
     val responseSender =
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index f24560259a88..3337bb9b8ea4 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.connect.SparkConnectServerTest
+import org.apache.spark.sql.connect.config.Connect
 
 class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
 
@@ -267,4 +268,25 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
       iter2.hasNext
     }
   }
+
+  test("Exceptions thrown in the gRPC response observer does not lead to 
infinite retries") {
+    withSparkEnvConfs(
+      (Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key, 
"10")) {
+      withClient { client =>
+        val query = client.execute(buildPlan("SELECT 1"))
+        query.hasNext
+        val execution = eventuallyGetExecutionHolder
+        Eventually.eventually(timeout(eventuallyTimeout)) {
+          assert(!execution.isExecuteThreadRunnerAlive())
+        }
+
+        execution.undoResponseObserverCompletion()
+
+        val error = intercept[SparkException] {
+          while (query.hasNext) query.next()
+        }
+        assert(error.getMessage.contains("IllegalStateException"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to