This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1d632f1af4f8af0d01e8e29d5857d14c88210ded
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Tue Jan 30 23:30:40 2024 +0800

    [improvement](move-memtable) enable stream write to socket in background 
bthread (#30586)
---
 be/src/common/config.cpp             | 2 ++
 be/src/common/config.h               | 2 ++
 be/src/vec/sink/load_stream_stub.cpp | 4 +++-
 3 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0e631d0a8e4..d68efb0550a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -767,6 +767,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, 
"0.1");
 DEFINE_Bool(share_delta_writers, "true");
 // timeout for open load stream rpc in ms
 DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
+// enable write background when using brpc stream
+DEFINE_mBool(enable_brpc_stream_write_background, "true");
 
 // brpc streaming max_buf_size in bytes
 DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 303e170bde7..e2e8b5323e7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -820,6 +820,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 DECLARE_Bool(share_delta_writers);
 // timeout for open load stream rpc in ms
 DECLARE_Int64(open_load_stream_timeout_ms);
+// enable write background when using brpc stream
+DECLARE_mBool(enable_brpc_stream_write_background);
 
 // brpc streaming max_buf_size in bytes
 DECLARE_Int64(load_stream_max_buf_size);
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 5751e8308bd..de1ade5aadb 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -390,7 +390,9 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
                 int64_t delay_ms = dp->param<int64>("delay_ms", 1000);
                 bthread_usleep(delay_ms * 1000);
             });
-            ret = brpc::StreamWrite(_stream_id, buf);
+            brpc::StreamWriteOptions options;
+            options.write_in_background = 
config::enable_brpc_stream_write_background;
+            ret = brpc::StreamWrite(_stream_id, buf, &options);
         }
         DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", 
{ ret = EPIPE; });
         switch (ret) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to