This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8c8ea593ec5 branch-3.0: [improve](move-memtable) reduce flush token num #46001 (#46178) 8c8ea593ec5 is described below commit 8c8ea593ec56fc47deffe880224f65b83cdf7f59 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Jan 3 11:25:02 2025 +0800 branch-3.0: [improve](move-memtable) reduce flush token num #46001 (#46178) Cherry-picked from #46001 Co-authored-by: Kaijie Chen <chenkai...@selectdb.com> --- be/src/runtime/load_stream.cpp | 14 +++++--------- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_mgr.cpp | 3 +-- be/src/runtime/load_stream_mgr.h | 9 ++------- 4 files changed, 9 insertions(+), 19 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 60da45fa685..0bd045d46c1 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -64,7 +64,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _load_id(load_id), _txn_id(txn_id), _load_stream_mgr(load_stream_mgr) { - load_stream_mgr->create_tokens(_flush_tokens); + load_stream_mgr->create_token(_flush_token); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); @@ -178,7 +178,6 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data LOG(WARNING) << "write data failed " << st << ", " << *this; } }; - auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; auto load_stream_max_wait_flush_token_time_ms = config::load_stream_max_wait_flush_token_time_ms; @@ -188,7 +187,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data }); MonotonicStopWatch timer; timer.start(); - while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { + while (_flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { _status.update( Status::Error<true>("wait flush token back pressure time is more than " @@ -206,7 +205,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed", { st = Status::InternalError("fault injection"); }); if (st.ok()) { - st = flush_token->submit_func(flush_func); + st = _flush_token->submit_func(flush_func); } if (!st.ok()) { _status.update(st); @@ -263,12 +262,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data LOG(INFO) << "add segment failed " << *this; } }; - auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; Status st = Status::OK(); DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed", { st = Status::InternalError("fault injection"); }); if (st.ok()) { - st = flush_token->submit_func(add_segment_func); + st = _flush_token->submit_func(add_segment_func); } if (!st.ok()) { _status.update(st); @@ -303,9 +301,7 @@ void TabletStream::pre_close() { SCOPED_TIMER(_close_wait_timer); _status.update(_run_in_heavy_work_pool([this]() { - for (auto& token : _flush_tokens) { - token->wait(); - } + _flush_token->wait(); return Status::OK(); })); // it is necessary to check status after wait_func, diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 1f4ef2b3c4c..1eacbefb052 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -65,7 +65,7 @@ private: int64_t _id; LoadStreamWriterSharedPtr _load_stream_writer; - std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens; + std::unique_ptr<ThreadPoolToken> _flush_token; std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping; std::atomic<uint32_t> _next_segid; int64_t _num_segments = 0; diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 67739a0c0b0..411f90cebb5 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -32,8 +32,7 @@ namespace doris { -LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) - : _num_threads(segment_file_writer_thread_num) { +LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) { static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool") .set_min_threads(segment_file_writer_thread_num) .set_max_threads(segment_file_writer_thread_num) diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 45abd9c8470..dbbdbaf0070 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -39,11 +39,8 @@ public: Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream); void clear_load(UniqueId loadid); - void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) { - for (int i = 0; i < _num_threads * 2; i++) { - tokens.push_back( - _file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); - } + void create_token(std::unique_ptr<ThreadPoolToken>& token) { + token = _file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL); } std::vector<std::string> get_all_load_stream_ids() { @@ -70,8 +67,6 @@ private: std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map; std::unique_ptr<ThreadPool> _file_writer_thread_pool; - uint32_t _num_threads = 0; - FifoThreadPool* _heavy_work_pool = nullptr; FifoThreadPool* _light_work_pool = nullptr; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org