This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8858a4c9628f [SPARK-47081][CONNECT][FOLLOW-UP] Respect
`spark.connect.progress.reportInterval` over timeout
8858a4c9628f is described below
commit 8858a4c9628f349986e093a93737b2eaed7e8833
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Jan 14 11:23:22 2025 +0900
[SPARK-47081][CONNECT][FOLLOW-UP] Respect
`spark.connect.progress.reportInterval` over timeout
### What changes were proposed in this pull request?
This PR is a followup that addresses
https://github.com/apache/spark/pull/45150#discussion_r1913310090
### Why are the changes needed?
To respect `spark.connect.progress.reportInterval`
### Does this PR introduce _any_ user-facing change?
Virtually no. In corner case, it the progress upgrade might take longer
than `spark.connect.progress.reportInterval`.
### How was this patch tested?
Manually tested.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49474 from HyukjinKwon/SPARK-47081-followup3.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 44b634af95ca..72c2b0e3f109 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -241,14 +241,13 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
// The state of interrupted, response and lastIndex are changed
under executionObserver
// monitor, and will notify upon state change.
if (response.isEmpty) {
+ val timeout = Math.max(1, deadlineTimeMillis -
System.currentTimeMillis())
// Wake up more frequently to send the progress updates.
val progressTimeout =
executeHolder.sessionHolder.session.sessionState.conf
.getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
// If the progress feature is disabled, wait for the deadline.
- val timeout = if (progressTimeout > 0) {
- progressTimeout
- } else {
- Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
+ if (progressTimeout > 0L) {
+ Math.min(progressTimeout, timeout)
}
logTrace(s"Wait for response to become available with
timeout=$timeout ms.")
executionObserver.responseLock.wait(timeout)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]