This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8858a4c9628f [SPARK-47081][CONNECT][FOLLOW-UP] Respect 
`spark.connect.progress.reportInterval` over timeout
8858a4c9628f is described below

commit 8858a4c9628f349986e093a93737b2eaed7e8833
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Jan 14 11:23:22 2025 +0900

    [SPARK-47081][CONNECT][FOLLOW-UP] Respect 
`spark.connect.progress.reportInterval` over timeout
    
    ### What changes were proposed in this pull request?
    
    This PR is a followup that addresses 
https://github.com/apache/spark/pull/45150#discussion_r1913310090
    
    ### Why are the changes needed?
    
    To respect `spark.connect.progress.reportInterval`
    
    ### Does this PR introduce _any_ user-facing change?
    
    Virtually no. In corner case, it the progress upgrade might take longer 
than `spark.connect.progress.reportInterval`.
    
    ### How was this patch tested?
    
    Manually tested.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49474 from HyukjinKwon/SPARK-47081-followup3.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 44b634af95ca..72c2b0e3f109 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -241,14 +241,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           // The state of interrupted, response and lastIndex are changed 
under executionObserver
           // monitor, and will notify upon state change.
           if (response.isEmpty) {
+            val timeout = Math.max(1, deadlineTimeMillis - 
System.currentTimeMillis())
             // Wake up more frequently to send the progress updates.
             val progressTimeout = 
executeHolder.sessionHolder.session.sessionState.conf
               .getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
             // If the progress feature is disabled, wait for the deadline.
-            val timeout = if (progressTimeout > 0) {
-              progressTimeout
-            } else {
-              Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
+            if (progressTimeout > 0L) {
+              Math.min(progressTimeout, timeout)
             }
             logTrace(s"Wait for response to become available with 
timeout=$timeout ms.")
             executionObserver.responseLock.wait(timeout)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to