This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f25b924b3 [opt](conf) Modify brpc eovercrowded conf (#22407)
5f25b924b3 is described below

commit 5f25b924b37ec1e27eac9d3c980e459c0971f73a
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Aug 1 08:47:55 2023 +0800

    [opt](conf) Modify brpc eovercrowded conf (#22407)
    
    brpc ignore eovercrowded of data stream sender and exchange sink buffer
    Modify the default value of brpc_socket_max_unwritten_bytes
---
 be/src/common/config.cpp                      | 4 ++--
 be/src/common/config.h                        | 3 +++
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++++
 be/src/service/brpc_service.cpp               | 6 +++++-
 be/src/vec/sink/vdata_stream_sender.cpp       | 3 +++
 5 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index d7248e0f0f..c2e0cfe169 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -474,6 +474,7 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
+DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -644,8 +645,7 @@ DEFINE_String(default_rowset_type, "BETA");
 
 // Maximum size of a single message body in all protocols
 DEFINE_Int64(brpc_max_body_size, "3147483648");
-// Max unwritten bytes in each socket, if the limit is reached, Socket.Write 
fails with EOVERCROWDED
-DEFINE_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
+DEFINE_Int64(brpc_socket_max_unwritten_bytes, "-1");
 // TODO(zxy): expect to be true in v1.3
 // Whether to embed the ProtoBuf Request serialized string together with 
Tuple/Block data into
 // Controller Attachment and send it through http brpc when the length of the 
Tuple/Block data
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d60d816a99..f177d86b3b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -521,6 +521,7 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
+DECLARE_mBool(exchange_sink_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -692,6 +693,8 @@ DECLARE_String(default_rowset_type);
 // Maximum size of a single message body in all protocols
 DECLARE_Int64(brpc_max_body_size);
 // Max unwritten bytes in each socket, if the limit is reached, Socket.Write 
fails with EOVERCROWDED
+// Default, if the physical memory is less than or equal to 64G, the value is 
1G
+//          if the physical memory is greater than 64G, the value is physical 
memory * mem_limit(0.8) / 1024 * 20
 DECLARE_Int64(brpc_socket_max_unwritten_bytes);
 // TODO(zxy): expect to be true in v1.3
 // Whether to embed the ProtoBuf Request serialized string together with 
Tuple/Block data into
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 0a487922a2..7ff0f7b503 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -197,6 +197,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        if (config::exchange_sink_ignore_eovercrowded) {
+            closure->cntl.ignore_eovercrowded();
+        }
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
         closure->start_rpc_time = GetCurrentTimeNanos();
@@ -247,6 +250,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        if (config::exchange_sink_ignore_eovercrowded) {
+            closure->cntl.ignore_eovercrowded();
+        }
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
         closure->start_rpc_time = GetCurrentTimeNanos();
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 2406d08b1e..1c65d55ea8 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -31,6 +31,7 @@
 #include "common/logging.h"
 #include "service/backend_options.h"
 #include "service/internal_service.h"
+#include "util/mem_info.h"
 
 namespace brpc {
 
@@ -44,7 +45,10 @@ namespace doris {
 BRpcService::BRpcService(ExecEnv* exec_env) : _exec_env(exec_env), _server(new 
brpc::Server()) {
     // Set config
     brpc::FLAGS_max_body_size = config::brpc_max_body_size;
-    brpc::FLAGS_socket_max_unwritten_bytes = 
config::brpc_socket_max_unwritten_bytes;
+    brpc::FLAGS_socket_max_unwritten_bytes =
+            config::brpc_socket_max_unwritten_bytes != -1
+                    ? config::brpc_socket_max_unwritten_bytes
+                    : std::max((int64_t)1073741824, (MemInfo::mem_limit() / 
1024) * 20);
 }
 
 BRpcService::~BRpcService() = default;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 29fb3446dc..dec3b43f09 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -178,6 +178,9 @@ Status Channel::send_block(PBlock* block, bool eos) {
 
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
+    if (config::exchange_sink_ignore_eovercrowded) {
+        _closure->cntl.ignore_eovercrowded();
+    }
 
     {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to