This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b9a17efe25cd346443b1c25cff979faf96e988d4 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Aug 1 08:47:55 2023 +0800 [opt](conf) Modify brpc eovercrowded conf (#22407) brpc ignore eovercrowded of data stream sender and exchange sink buffer Modify the default value of brpc_socket_max_unwritten_bytes --- be/src/common/config.cpp | 4 ++-- be/src/common/config.h | 3 +++ be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++++ be/src/service/brpc_service.cpp | 6 +++++- be/src/vec/sink/vdata_stream_sender.cpp | 3 +++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 15b46c74c4..b2b5354e65 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -470,6 +470,7 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true"); +DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true"); DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -640,8 +641,7 @@ DEFINE_String(default_rowset_type, "BETA"); // Maximum size of a single message body in all protocols DEFINE_Int64(brpc_max_body_size, "3147483648"); -// Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED -DEFINE_Int64(brpc_socket_max_unwritten_bytes, "1073741824"); +DEFINE_Int64(brpc_socket_max_unwritten_bytes, "-1"); // TODO(zxy): expect to be true in v1.3 // Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into // Controller Attachment and send it through http brpc when the length of the Tuple/Block data diff --git a/be/src/common/config.h b/be/src/common/config.h index d35cc327a5..a2a0d50e7c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -516,6 +516,7 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec); DECLARE_Int32(tablet_writer_open_rpc_timeout_sec); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DECLARE_mBool(tablet_writer_ignore_eovercrowded); +DECLARE_mBool(exchange_sink_ignore_eovercrowded); DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -687,6 +688,8 @@ DECLARE_String(default_rowset_type); // Maximum size of a single message body in all protocols DECLARE_Int64(brpc_max_body_size); // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED +// Default, if the physical memory is less than or equal to 64G, the value is 1G +// if the physical memory is greater than 64G, the value is physical memory * mem_limit(0.8) / 1024 * 20 DECLARE_Int64(brpc_socket_max_unwritten_bytes); // TODO(zxy): expect to be true in v1.3 // Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0a487922a2..7ff0f7b503 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -197,6 +197,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _instance_to_rpc_ctx[id] = rpc_ctx; closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + if (config::exchange_sink_ignore_eovercrowded) { + closure->cntl.ignore_eovercrowded(); + } closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); closure->start_rpc_time = GetCurrentTimeNanos(); @@ -247,6 +250,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _instance_to_rpc_ctx[id] = rpc_ctx; closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + if (config::exchange_sink_ignore_eovercrowded) { + closure->cntl.ignore_eovercrowded(); + } closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); closure->start_rpc_time = GetCurrentTimeNanos(); diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp index 2406d08b1e..1c65d55ea8 100644 --- a/be/src/service/brpc_service.cpp +++ b/be/src/service/brpc_service.cpp @@ -31,6 +31,7 @@ #include "common/logging.h" #include "service/backend_options.h" #include "service/internal_service.h" +#include "util/mem_info.h" namespace brpc { @@ -44,7 +45,10 @@ namespace doris { BRpcService::BRpcService(ExecEnv* exec_env) : _exec_env(exec_env), _server(new brpc::Server()) { // Set config brpc::FLAGS_max_body_size = config::brpc_max_body_size; - brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes; + brpc::FLAGS_socket_max_unwritten_bytes = + config::brpc_socket_max_unwritten_bytes != -1 + ? config::brpc_socket_max_unwritten_bytes + : std::max((int64_t)1073741824, (MemInfo::mem_limit() / 1024) * 20); } BRpcService::~BRpcService() = default; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f85cc5e4ec..713fcb257c 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -178,6 +178,9 @@ Status Channel::send_block(PBlock* block, bool eos) { _closure->ref(); _closure->cntl.set_timeout_ms(_brpc_timeout_ms); + if (config::exchange_sink_ignore_eovercrowded) { + _closure->cntl.ignore_eovercrowded(); + } { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org