This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4494b9c04d9 [Improvement](runtime-filter) enlarge merge_filter_callback timeout_ms (#42602) 4494b9c04d9 is described below commit 4494b9c04d99a624efd6f901c775809b7db81b5b Author: Pxl <pxl...@qq.com> AuthorDate: Thu Oct 31 14:34:10 2024 +0800 [Improvement](runtime-filter) enlarge merge_filter_callback timeout_ms (#42602) ## Proposed changes Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like other rf-related rpc requests, the timeout should be set larger. --- be/src/common/config.cpp | 4 ++-- be/src/common/config.h | 3 ++- be/src/exprs/runtime_filter.cpp | 15 +++++++++++---- be/src/pipeline/exec/exchange_sink_buffer.cpp | 4 ++-- be/src/runtime/runtime_filter_mgr.cpp | 15 +++++++++++---- be/src/runtime/runtime_state.h | 5 +++++ be/src/vec/sink/vdata_stream_sender.cpp | 2 +- 7 files changed, 34 insertions(+), 14 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f7c17aefee8..2fdebbd09c2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -540,7 +540,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true"); -DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true"); DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -903,7 +902,8 @@ DEFINE_mInt64(small_column_size_buffer, "100"); // Perform the always_true check at intervals determined by runtime_filter_sampling_frequency DEFINE_mInt32(runtime_filter_sampling_frequency, "64"); - +DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600"); +DEFINE_mBool(execution_ignore_eovercrowded, "true"); // cooldown task configs DEFINE_Int32(cooldown_thread_num, "5"); DEFINE_mInt64(generate_cooldown_task_interval_sec, "20"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 5f73c31dcdb..791ca0b5e1a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -587,7 +587,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec); DECLARE_Int32(tablet_writer_open_rpc_timeout_sec); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DECLARE_mBool(tablet_writer_ignore_eovercrowded); -DECLARE_mBool(exchange_sink_ignore_eovercrowded); DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -958,6 +957,8 @@ DECLARE_mInt64(big_column_size_buffer); DECLARE_mInt64(small_column_size_buffer); DECLARE_mInt32(runtime_filter_sampling_frequency); +DECLARE_mInt32(execution_max_rpc_timeout_sec); +DECLARE_mBool(execution_ignore_eovercrowded); // cooldown task configs DECLARE_Int32(cooldown_thread_num); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 84a964f5c38..bd4cd3353b8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1146,8 +1146,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt request->set_filter_size(local_filter_size); request->set_filter_id(_filter_id); - callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000); - callback->cntl_->ignore_eovercrowded(); + + callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout())); + if (config::execution_ignore_eovercrowded) { + callback->cntl_->ignore_eovercrowded(); + } stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); @@ -1184,8 +1187,12 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { merge_filter_request->set_is_pipeline(true); auto column_type = _wrapper->column_type(); RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); - merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); - merge_filter_callback->cntl_->ignore_eovercrowded(); + + merge_filter_callback->cntl_->set_timeout_ms( + get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + merge_filter_callback->cntl_->ignore_eovercrowded(); + } if (get_ignored()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 016802f8f73..7163299d766 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -235,7 +235,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto send_callback = request.channel->get_send_callback(id, request.eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { + if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -313,7 +313,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } auto send_callback = request.channel->get_send_callback(id, request.eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { + if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 08a229c0ecf..77d2097d20c 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -29,6 +29,7 @@ #include <string> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "common/status.h" #include "exprs/bloom_filter_func.h" @@ -343,8 +344,10 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); - closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); - closure->cntl_->ignore_eovercrowded(); + closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } closure->request_->set_filter_id(filter_id); closure->request_->set_filter_size(cnt_val->global_size); @@ -456,8 +459,12 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ if (has_attachment) { closure->cntl_->request_attachment().append(request_attachment); } - closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); - closure->cntl_->ignore_eovercrowded(); + + closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } + // set fragment-id if (target.__isset.target_fragment_ids) { for (auto& target_fragment_id : target.target_fragment_ids) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 34ce79ec7a7..abc823bc25b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -38,6 +38,7 @@ #include "agent/be_exec_version_manager.h" #include "cctz/time_zone.h" #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/factory_creator.h" #include "common/status.h" #include "gutil/integral_types.h" @@ -51,6 +52,10 @@ namespace doris { class IRuntimeFilter; +inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) { + return std::min(config::execution_max_rpc_timeout_sec, execution_timeout_sec) * 1000; +} + namespace pipeline { class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 8460a62883d..d21b87561b5 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,7 +95,7 @@ Status Channel::open(RuntimeState* state) { } _be_number = state->be_number(); - _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; + _brpc_timeout_ms = get_execution_rpc_timeout_ms(state->execution_timeout()); _serializer.set_is_local(_is_local); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org