This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 969a34283eff [SPARK-53492][CONNECT] Reject second ExecutePlan with an
operation id that has already completed
969a34283eff is described below
commit 969a34283effbd38ed667d6750866e8c28f617fa
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Wed Sep 24 12:39:58 2025 -0400
[SPARK-53492][CONNECT] Reject second ExecutePlan with an operation id that
has already completed
### What changes were proposed in this pull request?
Throw an error with code `INVALID_HANDLE.OPERATION_ALREADY_EXISTS`
for an ExecutePlan request that contains an already seen operation id for
that
spark session and the previous operation was completed.
### Why are the changes needed?
Network blips can cause ExecutePlan requests to be lost and arrive late. In
the
meantime, the client might give up and retry causing a second request. This
can
cause two similar ExecutePlan requests to arrive on the server.
If the second request arrives while the first is still being processed, the
client will
simply ReattachExecute. However, in some cases, the request can arrive
after the
query is complete (eg, for very short queries). If the query contains a
WRITE
operation, then there are now two write operations executed.
Furthermore, SPARK-51425 introduced the ability for the client to customize
the
operation id. Clients can unintentionally send two queries with the same
operation
id, leading to confusing behaviour.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tests attached.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4-sonnet
Closes #52232 from nija-at/reject-existing-old-op.
Authored-by: Niranjan Jayakar <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../apache/spark/sql/connect/config/Connect.scala | 11 ++++
.../spark/sql/connect/service/SessionHolder.scala | 60 ++++++++++++++++++----
.../service/SparkConnectExecutionManager.scala | 29 +++++++----
.../service/SparkConnectServiceE2ESuite.scala | 28 ++++++++++
4 files changed, 108 insertions(+), 20 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 128823f2bb9c..c6049187f6be 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -328,6 +328,17 @@ object Connect {
.booleanConf
.createWithDefault(true)
+ val CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS =
+ buildStaticConf("spark.connect.session.inactiveOperations.cacheExpiration")
+ .doc(
+ "Expiration time for inactive operation IDs cache in Spark Connect
Session." +
+ " Operations are cached after completion for a period of time to
detect duplicates." +
+ " The time should allow for network late arrivals, at least several
minutes.")
+ .version("4.1.0")
+ .internal()
+ .timeConf(TimeUnit.MINUTES)
+ .createWithDefault(30)
+
val CONNECT_AUTHENTICATE_TOKEN =
buildStaticConf("spark.connect.authenticate.token")
.doc("A pre-shared token that will be used to authenticate clients. This
secret must be" +
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 93512a36552e..064e9cdc2279 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -93,8 +93,19 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
// Setting it to -1 indicated forever.
@volatile private var customInactiveTimeoutMs: Option[Long] = None
- private val operationIds: ConcurrentMap[String, Boolean] =
- new ConcurrentHashMap[String, Boolean]()
+ // Set of active operation IDs for this session.
+ private val activeOperationIds: mutable.Set[String] = mutable.Set.empty
+
+ // Cache of inactive operation IDs for this session, either completed,
interrupted or abandoned.
+ // The Boolean is just a placeholder since Guava needs a <K, V> pair.
+ private lazy val inactiveOperationIds: Cache[String, Boolean] =
+ CacheBuilder
+ .newBuilder()
+ .ticker(Ticker.systemTicker())
+ .expireAfterAccess(
+
SparkEnv.get.conf.get(Connect.CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS),
+ TimeUnit.MINUTES)
+ .build[String, Boolean]()
// The cache that maps an error id to a throwable. The throwable in cache is
independent to
// each other.
@@ -163,20 +174,45 @@ case class SessionHolder(userId: String, sessionId:
String, session: SparkSessio
messageParameters = Map("handle" -> sessionId))
}
- val alreadyExists = operationIds.putIfAbsent(operationId, true)
- if (alreadyExists) {
- // The existence of it should have been checked by
SparkConnectExecutionManager.
- throw new IllegalStateException(s"ExecuteHolder with opId=${operationId}
already exists!")
+ activeOperationIds.synchronized {
+ if (activeOperationIds.contains(operationId)) {
+ throw new IllegalStateException(s"ExecuteHolder with
opId=${operationId} already exists!")
+ }
+ activeOperationIds.add(operationId)
+ }
+ }
+
+ /**
+ * Returns the status of the operation in this session given the operation
id. Operations are
+ * cached for when they are inactive (completed, interrupted or abandoned).
Cache expiration is
+ * configured with CONNECT_INACTIVE_OPERATIONS_CACHE_EXPIRATION_MINS.
+ *
+ * @param operationId
+ * @return
+ * Some(true) if the operation is currently active, Some(false) if the
operation was
+ * completed, interrupted or abandoned recently, None if no operation with
this id is found in
+ * this session.
+ */
+ private[service] def getOperationStatus(operationId: String):
Option[Boolean] = {
+ if (activeOperationIds.contains(operationId)) {
+ return Some(true)
+ }
+ Option(inactiveOperationIds.getIfPresent(operationId)) match {
+ case Some(_) =>
+ return Some(false)
+ case None =>
+ return None
}
}
/**
- * Remove an operation ID from this session.
+ * Close an operation in this session by removing its operation ID.
*
* Called only by SparkConnectExecutionManager when an execution is ended.
*/
- private[service] def removeOperationId(operationId: String): Unit = {
- operationIds.remove(operationId)
+ private[service] def closeOperation(operationId: String): Unit = {
+ inactiveOperationIds.put(operationId, true)
+ activeOperationIds.remove(operationId)
}
/**
@@ -188,7 +224,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
val interruptedIds = new mutable.ArrayBuffer[String]()
val operationsIds =
SparkConnectService.streamingSessionManager.cleanupRunningQueries(this,
blocking = false)
- operationIds.asScala.foreach { case (operationId, _) =>
+ activeOperationIds.foreach { operationId =>
val executeKey = ExecuteKey(userId, sessionId, operationId)
SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach {
executeHolder =>
if (executeHolder.interrupt()) {
@@ -208,11 +244,12 @@ case class SessionHolder(userId: String, sessionId:
String, session: SparkSessio
val interruptedIds = new mutable.ArrayBuffer[String]()
val queries =
SparkConnectService.streamingSessionManager.getTaggedQuery(tag, session)
queries.foreach(q => Future(q.query.stop())(ExecutionContext.global))
- operationIds.asScala.foreach { case (operationId, _) =>
+ activeOperationIds.foreach { operationId =>
val executeKey = ExecuteKey(userId, sessionId, operationId)
SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach {
executeHolder =>
if (executeHolder.sparkSessionTags.contains(tag)) {
if (executeHolder.interrupt()) {
+ closeOperation(operationId)
interruptedIds += operationId
}
}
@@ -231,6 +268,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
val executeKey = ExecuteKey(userId, sessionId, operationId)
SparkConnectService.executionManager.getExecuteHolder(executeKey).foreach
{ executeHolder =>
if (executeHolder.interrupt()) {
+ closeOperation(operationId)
interruptedIds += operationId
}
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 35c4073fe93c..5f01676d9f89 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -89,27 +89,37 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
executeKey: ExecuteKey,
request: proto.ExecutePlanRequest,
sessionHolder: SessionHolder): ExecuteHolder = {
+ val opId = executeKey.operationId
val executeHolder = executions.compute(
executeKey,
(executeKey, oldExecuteHolder) => {
- // Check if the operation already exists, either in the active
execution map, or in the
- // graveyard of tombstones of executions that have been abandoned. The
latter is to prevent
- // double executions when the client retries, thinking it never
reached the server, but in
- // fact it did, and already got removed as abandoned.
+
+ // Check if the operation already exists, either in the active
execution map
if (oldExecuteHolder != null) {
throw new SparkSQLException(
errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
- messageParameters = Map("handle" -> executeKey.operationId))
+ messageParameters = Map("handle" -> opId))
}
- if (getAbandonedTombstone(executeKey).isDefined) {
+ // Check if the operation is already in the graveyard of abandoned
executions, or was
+ // recently completed. Prevents double execution when client retries
on a lost response.
+ if (getAbandonedTombstone(executeKey).isDefined ||
+ sessionHolder.getOperationStatus(opId).isDefined) {
+
+ logInfo(
+ log"Operation ${MDC(LogKeys.EXECUTE_KEY, executeKey)}: Already
tombstoned: " +
+ log"${MDC(LogKeys.STATUS,
getAbandonedTombstone(executeKey).isDefined)}.")
+ logInfo(
+ log"Operation ${MDC(LogKeys.EXECUTE_KEY, executeKey)}: Seen
previously: " +
+ log"${MDC(LogKeys.STATUS,
sessionHolder.getOperationStatus(opId).isDefined)}.")
+
throw new SparkSQLException(
errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
- messageParameters = Map("handle" -> executeKey.operationId))
+ messageParameters = Map("handle" -> opId))
}
new ExecuteHolder(executeKey, request, sessionHolder)
})
- sessionHolder.addOperationId(executeHolder.operationId)
+ sessionHolder.addOperationId(opId)
logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, executeHolder.key)}
is created.")
@@ -147,11 +157,12 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
// getting an INVALID_HANDLE.OPERATION_ABANDONED error on a retry.
if (abandoned) {
abandonedTombstones.put(key, executeHolder.getExecuteInfo)
+ executeHolder.sessionHolder.closeOperation(executeHolder.operationId)
}
// Remove the execution from the map *after* putting it in
abandonedTombstones.
executions.remove(key)
- executeHolder.sessionHolder.removeOperationId(executeHolder.operationId)
+ executeHolder.sessionHolder.closeOperation(executeHolder.operationId)
updateLastExecutionTime()
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index d127f2e5a4cd..e3ba35073f41 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -310,4 +310,32 @@ class SparkConnectServiceE2ESuite extends
SparkConnectServerTest {
}
}
}
+
+ test("reusing operation ID after completion throws
OPERATION_ALREADY_EXISTS") {
+ val fixedOperationId = UUID.randomUUID().toString
+
+ withRawBlockingStub { stub =>
+ val request1 =
+ buildExecutePlanRequest(buildPlan("SELECT 1"), operationId =
fixedOperationId)
+
+ val iter1 = stub.executePlan(request1)
+
+ // Consume all results to complete the operation
+ while (iter1.hasNext) {
+ iter1.next()
+ }
+
+ val request2 =
+ buildExecutePlanRequest(buildPlan("SELECT 2"), operationId =
fixedOperationId)
+
+ val error = intercept[io.grpc.StatusRuntimeException] {
+ val iter2 = stub.executePlan(request2)
+ iter2.hasNext
+ }
+
+ // Verify the error is OPERATION_ALREADY_EXISTS
+
assert(error.getMessage.contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS"))
+ assert(error.getMessage.contains(fixedOperationId))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]