This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8c8ea593ec5 branch-3.0: [improve](move-memtable) reduce flush token 
num #46001 (#46178)
8c8ea593ec5 is described below

commit 8c8ea593ec56fc47deffe880224f65b83cdf7f59
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 3 11:25:02 2025 +0800

    branch-3.0: [improve](move-memtable) reduce flush token num #46001 (#46178)
    
    Cherry-picked from #46001
    
    Co-authored-by: Kaijie Chen <chenkai...@selectdb.com>
---
 be/src/runtime/load_stream.cpp     | 14 +++++---------
 be/src/runtime/load_stream.h       |  2 +-
 be/src/runtime/load_stream_mgr.cpp |  3 +--
 be/src/runtime/load_stream_mgr.h   |  9 ++-------
 4 files changed, 9 insertions(+), 19 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 60da45fa685..0bd045d46c1 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -64,7 +64,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, 
int64_t txn_id,
           _load_id(load_id),
           _txn_id(txn_id),
           _load_stream_mgr(load_stream_mgr) {
-    load_stream_mgr->create_tokens(_flush_tokens);
+    load_stream_mgr->create_token(_flush_token);
     _profile = profile->create_child(fmt::format("TabletStream {}", id), true, 
true);
     _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
     _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
@@ -178,7 +178,6 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             LOG(WARNING) << "write data failed " << st << ", " << *this;
         }
     };
-    auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
     auto load_stream_flush_token_max_tasks = 
config::load_stream_flush_token_max_tasks;
     auto load_stream_max_wait_flush_token_time_ms =
             config::load_stream_max_wait_flush_token_time_ms;
@@ -188,7 +187,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     });
     MonotonicStopWatch timer;
     timer.start();
-    while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
+    while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
         if (timer.elapsed_time() / 1000 / 1000 >= 
load_stream_max_wait_flush_token_time_ms) {
             _status.update(
                     Status::Error<true>("wait flush token back pressure time 
is more than "
@@ -206,7 +205,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
                     { st = Status::InternalError("fault injection"); });
     if (st.ok()) {
-        st = flush_token->submit_func(flush_func);
+        st = _flush_token->submit_func(flush_func);
     }
     if (!st.ok()) {
         _status.update(st);
@@ -263,12 +262,11 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
             LOG(INFO) << "add segment failed " << *this;
         }
     };
-    auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
     Status st = Status::OK();
     DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
                     { st = Status::InternalError("fault injection"); });
     if (st.ok()) {
-        st = flush_token->submit_func(add_segment_func);
+        st = _flush_token->submit_func(add_segment_func);
     }
     if (!st.ok()) {
         _status.update(st);
@@ -303,9 +301,7 @@ void TabletStream::pre_close() {
 
     SCOPED_TIMER(_close_wait_timer);
     _status.update(_run_in_heavy_work_pool([this]() {
-        for (auto& token : _flush_tokens) {
-            token->wait();
-        }
+        _flush_token->wait();
         return Status::OK();
     }));
     // it is necessary to check status after wait_func,
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 1f4ef2b3c4c..1eacbefb052 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -65,7 +65,7 @@ private:
 
     int64_t _id;
     LoadStreamWriterSharedPtr _load_stream_writer;
-    std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
+    std::unique_ptr<ThreadPoolToken> _flush_token;
     std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
     std::atomic<uint32_t> _next_segid;
     int64_t _num_segments = 0;
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index 67739a0c0b0..411f90cebb5 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -32,8 +32,7 @@
 
 namespace doris {
 
-LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num)
-        : _num_threads(segment_file_writer_thread_num) {
+LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
     static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
                               .set_min_threads(segment_file_writer_thread_num)
                               .set_max_threads(segment_file_writer_thread_num)
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 45abd9c8470..dbbdbaf0070 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -39,11 +39,8 @@ public:
 
     Status open_load_stream(const POpenLoadStreamRequest* request, 
LoadStream*& load_stream);
     void clear_load(UniqueId loadid);
-    void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
-        for (int i = 0; i < _num_threads * 2; i++) {
-            tokens.push_back(
-                    
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
-        }
+    void create_token(std::unique_ptr<ThreadPoolToken>& token) {
+        token = 
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
     }
 
     std::vector<std::string> get_all_load_stream_ids() {
@@ -70,8 +67,6 @@ private:
     std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
     std::unique_ptr<ThreadPool> _file_writer_thread_pool;
 
-    uint32_t _num_threads = 0;
-
     FifoThreadPool* _heavy_work_pool = nullptr;
     FifoThreadPool* _light_work_pool = nullptr;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to