This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch test_0702
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0a5606c4d20bf49d773c09e36e3c57ad75507e0e
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue Jul 2 14:16:39 2024 +0800

    [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103)
    
    ## Proposed changes
    enlarge sync filter size rpc timeout limit
    
    rf will failed when rpc timeout, so we need enlarge limit
    ```
    sync filter size meet error, filter: RuntimeFilter: (id = 3, type = 
in_or_bloomfilter, need_local_merge: false, is_broadcast: false, 
build_bf_cardinality: true
    ```
---
 be/src/common/config.cpp                               | 1 +
 be/src/exprs/runtime_filter.cpp                        | 4 ++--
 be/src/exprs/runtime_filter.h                          | 2 +-
 be/src/exprs/runtime_filter_slots.h                    | 2 +-
 regression-test/suites/query_p0/join/test_join5.groovy | 1 +
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 910bf69609e..8ca9b6254ab 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -246,6 +246,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size, 
"102400");
 // default thrift client connect timeout(in seconds)
 DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
 DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
+
 // default thrift client retry interval (in milliseconds)
 DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
 // max message size of thrift request
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c84f7ad83e6..107aa7d0f99 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1059,7 +1059,7 @@ public:
             : Base(req, callback), _dependency(std::move(dependency)), 
_filter(filter) {}
 };
 
-Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
+Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
     DCHECK(is_producer());
 
     if (_need_local_merge) {
@@ -1110,7 +1110,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t 
local_filter_size) {
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(wait_time_ms());
+    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ee6897be322..e8c5bbfd872 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -230,7 +230,7 @@ public:
     // push filter to remote node or push down it to scan_node
     Status publish(bool publish_local = false);
 
-    Status send_filter_size(uint64_t local_filter_size);
+    Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index ac85a02bed4..ebda4b56fcc 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -55,7 +55,7 @@ public:
         // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
             if (runtime_filter->need_sync_filter_size()) {
-                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+                RETURN_IF_ERROR(runtime_filter->send_filter_size(state, 
hash_table_size));
             }
         }
         return Status::OK();
diff --git a/regression-test/suites/query_p0/join/test_join5.groovy 
b/regression-test/suites/query_p0/join/test_join5.groovy
index 62be496372d..4323575870f 100644
--- a/regression-test/suites/query_p0/join/test_join5.groovy
+++ b/regression-test/suites/query_p0/join/test_join5.groovy
@@ -16,6 +16,7 @@
 // under the License.
 
 suite("test_join5", "query,p0") {
+    sql "set runtime_filter_wait_time_ms = 5"
     def DBname = "regression_test_join5"
     sql "DROP DATABASE IF EXISTS ${DBname}"
     sql "CREATE DATABASE IF NOT EXISTS ${DBname}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to