This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c97fdeb0067e [SPARK-53900][CONNECT] Fix unintentional `Thread.wait(0)`
under rare conditions in `ExecuteGrpcResponseSender`
c97fdeb0067e is described below
commit c97fdeb0067e4867a90baba806856fc33fc2f805
Author: vicennial <[email protected]>
AuthorDate: Tue Oct 14 12:04:13 2025 -0400
[SPARK-53900][CONNECT] Fix unintentional `Thread.wait(0)` under rare
conditions in `ExecuteGrpcResponseSender`
### What changes were proposed in this pull request?
Sets a lower bound of `1` to values passed into `Thread#wait` to avoid an
unintentional indefinite wait.
### Why are the changes needed?
A bug in `ExecuteGrpcResponseSender` causes RPC streams to hang
indefinitely when the configured deadline passes. The bug was introduced in
[PR](https://github.com/apache/spark/pull/49003/files#diff-d4629281431427e41afd6d3db6630bcfdbfdbf77ba74cf7e48a988c1b66c13f1L244-L253])
during migration from System.currentTimeMillis() to System.nanoTime(), where
an integer division error converts sub-millisecond timeout values to 0,
triggering Java's wait(0) behavior (infinite wait).
#### Root Cause
`executionObserver.responseLock.wait(timeoutNs / NANOS_PER_MILLIS) // ←
BUG`
The Problem: When `deadlineTimeNs < System.nanoTime()` (deadline has
passed):
- Math.max(1, negative_value) clamps to 1 nanosecond
- Math.min(progressInterval_ns, 1) remains 1 nanosecond
- Integer division: 1 / 1,000,000 = 0 milliseconds
- wait(0) in Java means wait indefinitely until notified
- No notification arrives (execution already completed), thread hangs
forever
While one the loop conditions guards against `deadlineTimeNs <
System.nanoTime()`, it isn’t sufficient as the deadline can elapse while inside
the loop (the time is freshly fetched in the latter timeout calculation). The
probability of occurrence can be exacerbated by GC pauses.
#### Conditions Required for Bug to Trigger
The bug manifests when all of the following conditions are met:
- Reattachable execution enabled (CONNECT_EXECUTE_REATTACHABLE_ENABLED =
true)
- Execution completes prior to the deadline within the inner loop
- (all responses sent before deadline)
- Deadline passes within the inner loop
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52609 from vicennial/fixWait.
Authored-by: vicennial <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit ff0f1ab95238b27af93c755b600b82c0769fb8d0)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 3a707495ff3f..785b254d7af0 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -263,7 +263,7 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
timeoutNs = Math.min(progressTimeout * NANOS_PER_MILLIS,
timeoutNs)
}
logTrace(s"Wait for response to become available with
timeout=$timeoutNs ns.")
- executionObserver.responseLock.wait(timeoutNs / NANOS_PER_MILLIS)
+ executionObserver.responseLock.wait(Math.max(1, timeoutNs /
NANOS_PER_MILLIS))
enqueueProgressMessage(force = true)
logTrace(s"Reacquired executionObserver lock after waiting.")
sleepEnd = System.nanoTime()
@@ -384,7 +384,7 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
val timeoutNs = Math.max(1, deadlineTimeNs - System.nanoTime())
var sleepStart = System.nanoTime()
logTrace(s"Wait for grpcCallObserver to become ready with
timeout=$timeoutNs ns.")
- grpcCallObserverReadySignal.wait(timeoutNs / NANOS_PER_MILLIS)
+ grpcCallObserverReadySignal.wait(Math.max(1, timeoutNs /
NANOS_PER_MILLIS))
logTrace(s"Reacquired grpcCallObserverReadySignal lock after
waiting.")
sleepEnd = System.nanoTime()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]