This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1d632f1af4f8af0d01e8e29d5857d14c88210ded Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Tue Jan 30 23:30:40 2024 +0800 [improvement](move-memtable) enable stream write to socket in background bthread (#30586) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/vec/sink/load_stream_stub.cpp | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0e631d0a8e4..d68efb0550a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -767,6 +767,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); DEFINE_Bool(share_delta_writers, "true"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s +// enable write background when using brpc stream +DEFINE_mBool(enable_brpc_stream_write_background, "true"); // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 303e170bde7..e2e8b5323e7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -820,6 +820,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); DECLARE_Bool(share_delta_writers); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); +// enable write background when using brpc stream +DECLARE_mBool(enable_brpc_stream_write_background); // brpc streaming max_buf_size in bytes DECLARE_Int64(load_stream_max_buf_size); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 5751e8308bd..de1ade5aadb 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -390,7 +390,9 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { int64_t delay_ms = dp->param<int64>("delay_ms", 1000); bthread_usleep(delay_ms * 1000); }); - ret = brpc::StreamWrite(_stream_id, buf); + brpc::StreamWriteOptions options; + options.write_in_background = config::enable_brpc_stream_write_background; + ret = brpc::StreamWrite(_stream_id, buf, &options); } DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; }); switch (ret) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org