This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4494b9c04d9 [Improvement](runtime-filter) enlarge 
merge_filter_callback timeout_ms (#42602)
4494b9c04d9 is described below

commit 4494b9c04d99a624efd6f901c775809b7db81b5b
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Oct 31 14:34:10 2024 +0800

    [Improvement](runtime-filter) enlarge merge_filter_callback timeout_ms 
(#42602)
    
    ## Proposed changes
    Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like
    other rf-related rpc requests, the timeout should be set larger.
---
 be/src/common/config.cpp                      |  4 ++--
 be/src/common/config.h                        |  3 ++-
 be/src/exprs/runtime_filter.cpp               | 15 +++++++++++----
 be/src/pipeline/exec/exchange_sink_buffer.cpp |  4 ++--
 be/src/runtime/runtime_filter_mgr.cpp         | 15 +++++++++++----
 be/src/runtime/runtime_state.h                |  5 +++++
 be/src/vec/sink/vdata_stream_sender.cpp       |  2 +-
 7 files changed, 34 insertions(+), 14 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f7c17aefee8..2fdebbd09c2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -540,7 +540,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
-DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -903,7 +902,8 @@ DEFINE_mInt64(small_column_size_buffer, "100");
 
 // Perform the always_true check at intervals determined by 
runtime_filter_sampling_frequency
 DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
-
+DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
+DEFINE_mBool(execution_ignore_eovercrowded, "true");
 // cooldown task configs
 DEFINE_Int32(cooldown_thread_num, "5");
 DEFINE_mInt64(generate_cooldown_task_interval_sec, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5f73c31dcdb..791ca0b5e1a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -587,7 +587,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
-DECLARE_mBool(exchange_sink_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -958,6 +957,8 @@ DECLARE_mInt64(big_column_size_buffer);
 DECLARE_mInt64(small_column_size_buffer);
 
 DECLARE_mInt32(runtime_filter_sampling_frequency);
+DECLARE_mInt32(execution_max_rpc_timeout_sec);
+DECLARE_mBool(execution_ignore_eovercrowded);
 
 // cooldown task configs
 DECLARE_Int32(cooldown_thread_num);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 84a964f5c38..bd4cd3353b8 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1146,8 +1146,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
-    callback->cntl_->ignore_eovercrowded();
+
+    
callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout()));
+    if (config::execution_ignore_eovercrowded) {
+        callback->cntl_->ignore_eovercrowded();
+    }
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
@@ -1184,8 +1187,12 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr) {
     merge_filter_request->set_is_pipeline(true);
     auto column_type = _wrapper->column_type();
     
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
-    merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
-    merge_filter_callback->cntl_->ignore_eovercrowded();
+
+    merge_filter_callback->cntl_->set_timeout_ms(
+            get_execution_rpc_timeout_ms(_state->execution_timeout));
+    if (config::execution_ignore_eovercrowded) {
+        merge_filter_callback->cntl_->ignore_eovercrowded();
+    }
 
     if (get_ignored()) {
         merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 016802f8f73..7163299d766 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -235,7 +235,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
 
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
@@ -313,7 +313,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 08a229c0ecf..77d2097d20c 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -29,6 +29,7 @@
 #include <string>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "exprs/bloom_filter_func.h"
@@ -343,8 +344,10 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             auto* pquery_id = closure->request_->mutable_query_id();
             pquery_id->set_hi(_state->query_id.hi());
             pquery_id->set_lo(_state->query_id.lo());
-            closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
-            closure->cntl_->ignore_eovercrowded();
+            
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
 
             closure->request_->set_filter_id(filter_id);
             closure->request_->set_filter_size(cnt_val->global_size);
@@ -456,8 +459,12 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             if (has_attachment) {
                 
closure->cntl_->request_attachment().append(request_attachment);
             }
-            closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
-            closure->cntl_->ignore_eovercrowded();
+
+            
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
+
             // set fragment-id
             if (target.__isset.target_fragment_ids) {
                 for (auto& target_fragment_id : target.target_fragment_ids) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 34ce79ec7a7..abc823bc25b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -38,6 +38,7 @@
 #include "agent/be_exec_version_manager.h"
 #include "cctz/time_zone.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/integral_types.h"
@@ -51,6 +52,10 @@
 namespace doris {
 class IRuntimeFilter;
 
+inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) {
+    return std::min(config::execution_max_rpc_timeout_sec, 
execution_timeout_sec) * 1000;
+}
+
 namespace pipeline {
 class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 8460a62883d..d21b87561b5 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,7 +95,7 @@ Status Channel::open(RuntimeState* state) {
     }
     _be_number = state->be_number();
 
-    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+    _brpc_timeout_ms = 
get_execution_rpc_timeout_ms(state->execution_timeout());
 
     _serializer.set_is_local(_is_local);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to