This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa5e7008d5d [Chore](runtime-filter) enlarge sync filter size rpc 
timeout limit (#37103)
fa5e7008d5d is described below

commit fa5e7008d5d918f30e0580ce7bda03512b3d6aec
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue Jul 2 14:16:39 2024 +0800

    [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103)
    
    ## Proposed changes
    enlarge sync filter size rpc timeout limit
    
    rf will failed when rpc timeout, so we need enlarge limit
    ```
    sync filter size meet error, filter: RuntimeFilter: (id = 3, type = 
in_or_bloomfilter, need_local_merge: false, is_broadcast: false, 
build_bf_cardinality: true
    ```
---
 be/src/common/config.cpp                               | 1 +
 be/src/exprs/runtime_filter.cpp                        | 4 ++--
 be/src/exprs/runtime_filter.h                          | 2 +-
 be/src/exprs/runtime_filter_slots.h                    | 2 +-
 regression-test/suites/query_p0/join/test_join5.groovy | 1 +
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 78afc756af8..92303473ad6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -255,6 +255,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size, 
"102400");
 // default thrift client connect timeout(in seconds)
 DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
 DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
+
 // default thrift client retry interval (in milliseconds)
 DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
 // max message size of thrift request
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e38a80a143e..2dcfc97b096 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1054,7 +1054,7 @@ public:
             : Base(req, callback), _dependency(std::move(dependency)) {}
 };
 
-Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
+Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
     DCHECK(is_producer());
 
     if (_need_local_merge) {
@@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t 
local_filter_size) {
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(wait_time_ms());
+    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a78d732b687..390a61bfe1a 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -229,7 +229,7 @@ public:
     // push filter to remote node or push down it to scan_node
     Status publish(bool publish_local = false);
 
-    Status send_filter_size(uint64_t local_filter_size);
+    Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index b5b04a1ebac..0bf8a33f9f2 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -55,7 +55,7 @@ public:
         // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
             if (runtime_filter->need_sync_filter_size()) {
-                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+                RETURN_IF_ERROR(runtime_filter->send_filter_size(state, 
hash_table_size));
             }
         }
         return Status::OK();
diff --git a/regression-test/suites/query_p0/join/test_join5.groovy 
b/regression-test/suites/query_p0/join/test_join5.groovy
index 62be496372d..4323575870f 100644
--- a/regression-test/suites/query_p0/join/test_join5.groovy
+++ b/regression-test/suites/query_p0/join/test_join5.groovy
@@ -16,6 +16,7 @@
 // under the License.
 
 suite("test_join5", "query,p0") {
+    sql "set runtime_filter_wait_time_ms = 5"
     def DBname = "regression_test_join5"
     sql "DROP DATABASE IF EXISTS ${DBname}"
     sql "CREATE DATABASE IF NOT EXISTS ${DBname}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to