This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 969a34283eff [SPARK-53492][CONNECT] Reject second ExecutePlan with an 
operation id that has already completed
969a34283eff is described below

commit 969a34283effbd38ed667d6750866e8c28f617fa
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Wed Sep 24 12:39:58 2025 -0400

    [SPARK-53492][CONNECT] Reject second ExecutePlan with an operation id that 
has already completed
    
    ### What changes were proposed in this pull request?
    
    Throw an error with code `INVALID_HANDLE.OPERATION_ALREADY_EXISTS`
    for an ExecutePlan request that contains an already seen operation id for 
that
    spark session and the previous operation was completed.
    
    ### Why are the changes needed?
    
    Network blips can cause ExecutePlan requests to be lost and arrive late. In 
the
    meantime, the client might give up and retry causing a second request. This 
can
    cause two similar ExecutePlan requests to arrive on the server.
    If the second request arrives while the first is still being processed, the 
client will
    simply ReattachExecute. However, in some cases, the request can arrive 
after the
    query is complete (eg, for very short queries). If the query contains a 
WRITE
    operation, then there are now two write operations executed.
    
    Furthermore, SPARK-51425 introduced the ability for the client to customize 
the
    operation id. Clients can unintentionally send two queries with the same 
operation
    id, leading to confusing behaviour.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tests attached.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: claude-4-sonnet
    
    Closes #52232 from nija-at/reject-existing-old-op.
    
    Authored-by: Niranjan Jayakar <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../apache/spark/sql/connect/config/Connect.scala  | 11 ++++
 .../spark/sql/connect/service/SessionHolder.scala  | 60 ++++++++++++++++++----
 .../service/SparkConnectExecutionManager.scala     | 29 +++++++----
 .../service/SparkConnectServiceE2ESuite.scala      | 28 ++++++++++
 4 files changed, 108 insertions(+), 20 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 128823f2bb9c..c6049187f6be 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -328,6 +328,17 @@ object Connect {
       .booleanConf
       .createWithDefault(true)
 
+  val CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS =
+    buildStaticConf("spark.connect.session.inactiveOperations.cacheExpiration")
+      .doc(
+        "Expiration time for inactive operation IDs cache in Spark Connect 
Session." +
+          " Operations are cached after completion for a period of time to 
detect duplicates." +
+          " The time should allow for network late arrivals, at least several 
minutes.")
+      .version("4.1.0")
+      .internal()
+      .timeConf(TimeUnit.MINUTES)
+      .createWithDefault(30)
+
   val CONNECT_AUTHENTICATE_TOKEN =
     buildStaticConf("spark.connect.authenticate.token")
       .doc("A pre-shared token that will be used to authenticate clients. This 
secret must be" +
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 93512a36552e..064e9cdc2279 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -93,8 +93,19 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
   // Setting it to -1 indicated forever.
   @volatile private var customInactiveTimeoutMs: Option[Long] = None
 
-  private val operationIds: ConcurrentMap[String, Boolean] =
-    new ConcurrentHashMap[String, Boolean]()
+  // Set of active operation IDs for this session.
+  private val activeOperationIds: mutable.Set[String] = mutable.Set.empty
+
+  // Cache of inactive operation IDs for this session, either completed, 
interrupted or abandoned.
+  // The Boolean is just a placeholder since Guava needs a <K, V> pair.
+  private lazy val inactiveOperationIds: Cache[String, Boolean] =
+    CacheBuilder
+      .newBuilder()
+      .ticker(Ticker.systemTicker())
+      .expireAfterAccess(
+        
SparkEnv.get.conf.get(Connect.CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS),
+        TimeUnit.MINUTES)
+      .build[String, Boolean]()
 
   // The cache that maps an error id to a throwable. The throwable in cache is 
independent to
   // each other.
@@ -163,20 +174,45 @@ case class SessionHolder(userId: String, sessionId: 
String, session: SparkSessio
         messageParameters = Map("handle" -> sessionId))
     }
 
-    val alreadyExists = operationIds.putIfAbsent(operationId, true)
-    if (alreadyExists) {
-      // The existence of it should have been checked by 
SparkConnectExecutionManager.
-      throw new IllegalStateException(s"ExecuteHolder with opId=${operationId} 
already exists!")
+    activeOperationIds.synchronized {
+      if (activeOperationIds.contains(operationId)) {
+        throw new IllegalStateException(s"ExecuteHolder with 
opId=${operationId} already exists!")
+      }
+      activeOperationIds.add(operationId)
+    }
+  }
+
+  /**
+   * Returns the status of the operation in this session given the operation 
id. Operations are
+   * cached for when they are inactive (completed, interrupted or abandoned). 
Cache expiration is
+   * configured with CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS.
+   *
+   * @param operationId
+   * @return
+   *   Some(true) if the operation is currently active, Some(false) if the 
operation was
+   *   completed, interrupted or abandoned recently, None if no operation with 
this id is found in
+   *   this session.
+   */
+  private[service] def getOperationStatus(operationId: String): 
Option[Boolean] = {
+    if (activeOperationIds.contains(operationId)) {
+      return Some(true)
+    }
+    Option(inactiveOperationIds.getIfPresent(operationId)) match {
+      case Some(_) =>
+        return Some(false)
+      case None =>
+        return None
     }
   }
 
   /**
-   * Remove an operation ID from this session.
+   * Close an operation in this session by removing its operation ID.
    *
    * Called only by SparkConnectExecutionManager when an execution is ended.
    */
-  private[service] def removeOperationId(operationId: String): Unit = {
-    operationIds.remove(operationId)
+  private[service] def closeOperation(operationId: String): Unit = {
+    inactiveOperationIds.put(operationId, true)
+    activeOperationIds.remove(operationId)
   }
 
   /**
@@ -188,7 +224,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
     val interruptedIds = new mutable.ArrayBuffer[String]()
     val operationsIds =
       SparkConnectService.streamingSessionManager.cleanupRunningQueries(this, 
blocking = false)
-    operationIds.asScala.foreach { case (operationId, _) =>
+    activeOperationIds.foreach { operationId =>
       val executeKey = ExecuteKey(userId, sessionId, operationId)
       
SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach { 
executeHolder =>
         if (executeHolder.interrupt()) {
@@ -208,11 +244,12 @@ case class SessionHolder(userId: String, sessionId: 
String, session: SparkSessio
     val interruptedIds = new mutable.ArrayBuffer[String]()
     val queries = 
SparkConnectService.streamingSessionManager.getTaggedQuery(tag, session)
     queries.foreach(q => Future(q.query.stop())(ExecutionContext.global))
-    operationIds.asScala.foreach { case (operationId, _) =>
+    activeOperationIds.foreach { operationId =>
       val executeKey = ExecuteKey(userId, sessionId, operationId)
       
SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach { 
executeHolder =>
         if (executeHolder.sparkSessionTags.contains(tag)) {
           if (executeHolder.interrupt()) {
+            closeOperation(operationId)
             interruptedIds += operationId
           }
         }
@@ -231,6 +268,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
     val executeKey = ExecuteKey(userId, sessionId, operationId)
     SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach 
{ executeHolder =>
       if (executeHolder.interrupt()) {
+        closeOperation(operationId)
         interruptedIds += operationId
       }
     }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 35c4073fe93c..5f01676d9f89 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -89,27 +89,37 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
       executeKey: ExecuteKey,
       request: proto.ExecutePlanRequest,
       sessionHolder: SessionHolder): ExecuteHolder = {
+    val opId = executeKey.operationId
     val executeHolder = executions.compute(
       executeKey,
       (executeKey, oldExecuteHolder) => {
-        // Check if the operation already exists, either in the active 
execution map, or in the
-        // graveyard of tombstones of executions that have been abandoned. The 
latter is to prevent
-        // double executions when the client retries, thinking it never 
reached the server, but in
-        // fact it did, and already got removed as abandoned.
+
+        // Check if the operation already exists, either in the active 
execution map
         if (oldExecuteHolder != null) {
           throw new SparkSQLException(
             errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
-            messageParameters = Map("handle" -> executeKey.operationId))
+            messageParameters = Map("handle" -> opId))
         }
-        if (getAbandonedTombstone(executeKey).isDefined) {
+        // Check if the operation is already in the graveyard of abandoned 
executions, or was
+        // recently completed. Prevents double execution when client retries 
on a lost response.
+        if (getAbandonedTombstone(executeKey).isDefined ||
+          sessionHolder.getOperationStatus(opId).isDefined) {
+
+          logInfo(
+            log"Operation ${MDC(LogKeys.EXECUTE_KEY, executeKey)}: Already 
tombstoned: " +
+              log"${MDC(LogKeys.STATUS, 
getAbandonedTombstone(executeKey).isDefined)}.")
+          logInfo(
+            log"Operation ${MDC(LogKeys.EXECUTE_KEY, executeKey)}: Seen 
previously: " +
+              log"${MDC(LogKeys.STATUS, 
sessionHolder.getOperationStatus(opId).isDefined)}.")
+
           throw new SparkSQLException(
             errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
-            messageParameters = Map("handle" -> executeKey.operationId))
+            messageParameters = Map("handle" -> opId))
         }
         new ExecuteHolder(executeKey, request, sessionHolder)
       })
 
-    sessionHolder.addOperationId(executeHolder.operationId)
+    sessionHolder.addOperationId(opId)
 
     logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, executeHolder.key)} 
is created.")
 
@@ -147,11 +157,12 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     // getting an INVALID_HANDLE.OPERATION_ABANDONED error on a retry.
     if (abandoned) {
       abandonedTombstones.put(key, executeHolder.getExecuteInfo)
+      executeHolder.sessionHolder.closeOperation(executeHolder.operationId)
     }
 
     // Remove the execution from the map *after* putting it in 
abandonedTombstones.
     executions.remove(key)
-    executeHolder.sessionHolder.removeOperationId(executeHolder.operationId)
+    executeHolder.sessionHolder.closeOperation(executeHolder.operationId)
 
     updateLastExecutionTime()
 
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index d127f2e5a4cd..e3ba35073f41 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -310,4 +310,32 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
       }
     }
   }
+
+  test("reusing operation ID after completion throws 
OPERATION_ALREADY_EXISTS") {
+    val fixedOperationId = UUID.randomUUID().toString
+
+    withRawBlockingStub { stub =>
+      val request1 =
+        buildExecutePlanRequest(buildPlan("SELECT 1"), operationId = 
fixedOperationId)
+
+      val iter1 = stub.executePlan(request1)
+
+      // Consume all results to complete the operation
+      while (iter1.hasNext) {
+        iter1.next()
+      }
+
+      val request2 =
+        buildExecutePlanRequest(buildPlan("SELECT 2"), operationId = 
fixedOperationId)
+
+      val error = intercept[io.grpc.StatusRuntimeException] {
+        val iter2 = stub.executePlan(request2)
+        iter2.hasNext
+      }
+
+      // Verify the error is OPERATION_ALREADY_EXISTS
+      
assert(error.getMessage.contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS"))
+      assert(error.getMessage.contains(fixedOperationId))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to