This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b9a17efe25cd346443b1c25cff979faf96e988d4
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Aug 1 08:47:55 2023 +0800

    [opt](conf) Modify brpc eovercrowded conf (#22407)
    
    brpc ignore eovercrowded of data stream sender and exchange sink buffer
    Modify the default value of brpc_socket_max_unwritten_bytes
---
 be/src/common/config.cpp                      | 4 ++--
 be/src/common/config.h                        | 3 +++
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++++
 be/src/service/brpc_service.cpp               | 6 +++++-
 be/src/vec/sink/vdata_stream_sender.cpp       | 3 +++
 5 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 15b46c74c4..b2b5354e65 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -470,6 +470,7 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
+DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -640,8 +641,7 @@ DEFINE_String(default_rowset_type, "BETA");
 
 // Maximum size of a single message body in all protocols
 DEFINE_Int64(brpc_max_body_size, "3147483648");
-// Max unwritten bytes in each socket, if the limit is reached, Socket.Write 
fails with EOVERCROWDED
-DEFINE_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
+DEFINE_Int64(brpc_socket_max_unwritten_bytes, "-1");
 // TODO(zxy): expect to be true in v1.3
 // Whether to embed the ProtoBuf Request serialized string together with 
Tuple/Block data into
 // Controller Attachment and send it through http brpc when the length of the 
Tuple/Block data
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d35cc327a5..a2a0d50e7c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -516,6 +516,7 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
+DECLARE_mBool(exchange_sink_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -687,6 +688,8 @@ DECLARE_String(default_rowset_type);
 // Maximum size of a single message body in all protocols
 DECLARE_Int64(brpc_max_body_size);
 // Max unwritten bytes in each socket, if the limit is reached, Socket.Write 
fails with EOVERCROWDED
+// Default, if the physical memory is less than or equal to 64G, the value is 
1G
+//          if the physical memory is greater than 64G, the value is physical 
memory * mem_limit(0.8) / 1024 * 20
 DECLARE_Int64(brpc_socket_max_unwritten_bytes);
 // TODO(zxy): expect to be true in v1.3
 // Whether to embed the ProtoBuf Request serialized string together with 
Tuple/Block data into
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0a487922a2..7ff0f7b503 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -197,6 +197,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        if (config::exchange_sink_ignore_eovercrowded) {
+            closure->cntl.ignore_eovercrowded();
+        }
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
         closure->start_rpc_time = GetCurrentTimeNanos();
@@ -247,6 +250,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        if (config::exchange_sink_ignore_eovercrowded) {
+            closure->cntl.ignore_eovercrowded();
+        }
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
         closure->start_rpc_time = GetCurrentTimeNanos();
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 2406d08b1e..1c65d55ea8 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -31,6 +31,7 @@
 #include "common/logging.h"
 #include "service/backend_options.h"
 #include "service/internal_service.h"
+#include "util/mem_info.h"
 
 namespace brpc {
 
@@ -44,7 +45,10 @@ namespace doris {
 BRpcService::BRpcService(ExecEnv* exec_env) : _exec_env(exec_env), _server(new 
brpc::Server()) {
     // Set config
     brpc::FLAGS_max_body_size = config::brpc_max_body_size;
-    brpc::FLAGS_socket_max_unwritten_bytes = 
config::brpc_socket_max_unwritten_bytes;
+    brpc::FLAGS_socket_max_unwritten_bytes =
+            config::brpc_socket_max_unwritten_bytes != -1
+                    ? config::brpc_socket_max_unwritten_bytes
+                    : std::max((int64_t)1073741824, (MemInfo::mem_limit() / 
1024) * 20);
 }
 
 BRpcService::~BRpcService() = default;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index f85cc5e4ec..713fcb257c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -178,6 +178,9 @@ Status Channel::send_block(PBlock* block, bool eos) {
 
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
+    if (config::exchange_sink_ignore_eovercrowded) {
+        _closure->cntl.ignore_eovercrowded();
+    }
 
     {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to