This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 70e1c563b36 [Chore](runtime-filter) enlarge sync filter size rpc 
timeout limit (#37103) (#37225)
70e1c563b36 is described below

commit 70e1c563b360b51e013624fb34ca1f175d2c4d78
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Jul 3 21:02:26 2024 +0800

    [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103) 
(#37225)
    
    pick from #37103
---
 be/src/common/config.cpp                               | 1 +
 be/src/exprs/runtime_filter.cpp                        | 4 ++--
 be/src/exprs/runtime_filter.h                          | 2 +-
 be/src/exprs/runtime_filter_slots.h                    | 2 +-
 regression-test/suites/query_p0/join/test_join5.groovy | 1 +
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fe811165c17..f276487d152 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -246,6 +246,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size, 
"102400");
 // default thrift client connect timeout(in seconds)
 DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
 DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
+
 // default thrift client retry interval (in milliseconds)
 DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
 // max message size of thrift request
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1271ec39156..3f8a19f1b16 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1054,7 +1054,7 @@ public:
             : Base(req, callback), _dependency(std::move(dependency)) {}
 };
 
-Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
+Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
     DCHECK(is_producer());
 
     if (_need_local_merge) {
@@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t 
local_filter_size) {
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(wait_time_ms());
+    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ee6897be322..e8c5bbfd872 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -230,7 +230,7 @@ public:
     // push filter to remote node or push down it to scan_node
     Status publish(bool publish_local = false);
 
-    Status send_filter_size(uint64_t local_filter_size);
+    Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index b5b04a1ebac..0bf8a33f9f2 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -55,7 +55,7 @@ public:
         // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
             if (runtime_filter->need_sync_filter_size()) {
-                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+                RETURN_IF_ERROR(runtime_filter->send_filter_size(state, 
hash_table_size));
             }
         }
         return Status::OK();
diff --git a/regression-test/suites/query_p0/join/test_join5.groovy 
b/regression-test/suites/query_p0/join/test_join5.groovy
index 62be496372d..4323575870f 100644
--- a/regression-test/suites/query_p0/join/test_join5.groovy
+++ b/regression-test/suites/query_p0/join/test_join5.groovy
@@ -16,6 +16,7 @@
 // under the License.
 
 suite("test_join5", "query,p0") {
+    sql "set runtime_filter_wait_time_ms = 5"
     def DBname = "regression_test_join5"
     sql "DROP DATABASE IF EXISTS ${DBname}"
     sql "CREATE DATABASE IF NOT EXISTS ${DBname}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to