This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3d7d85e6ab3 [branch-3.0]Add more load cpu usage to workload group (#42053) (#50983) 3d7d85e6ab3 is described below commit 3d7d85e6ab3334e9bc980cb5feb9aece86b0f9d1 Author: wangbo <wan...@selectdb.com> AuthorDate: Sat May 17 10:51:34 2025 +0800 [branch-3.0]Add more load cpu usage to workload group (#42053) (#50983) pick #42053 --- be/src/olap/delta_writer.cpp | 7 +++++- be/src/olap/delta_writer_v2.cpp | 7 +++--- be/src/olap/memtable_flush_executor.cpp | 30 +++++++++++++------------- be/src/olap/memtable_flush_executor.h | 13 +++++------ be/src/olap/memtable_writer.cpp | 18 ++++------------ be/src/olap/memtable_writer.h | 3 ++- be/src/runtime/query_context.h | 2 +- be/src/runtime/workload_group/workload_group.h | 11 ++++++++++ be/src/vec/sink/writer/async_result_writer.cpp | 11 ++++++++++ 9 files changed, 60 insertions(+), 42 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index f42253a7884..88277775f96 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -103,10 +103,15 @@ Status BaseDeltaWriter::init() { if (_is_init) { return Status::OK(); } + auto* t_ctx = doris::thread_context(true); + std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; + if (t_ctx) { + wg_sptr = t_ctx->workload_group().lock(); + } RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_memtable_writer->init( _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(), - _rowset_builder->get_partial_update_info(), nullptr, + _rowset_builder->get_partial_update_info(), wg_sptr, _rowset_builder->tablet()->enable_unique_key_merge_on_write())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 5a420f6c387..657ec265e50 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -127,13 +127,12 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - ThreadPool* wg_thread_pool_ptr = nullptr; + std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; if (_state->get_query_ctx()) { - wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool(); + wg_sptr = _state->get_query_ctx()->workload_group(); } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, - wg_thread_pool_ptr, - _streams[0]->enable_unique_mow(_req.index_id))); + wg_sptr, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; _streams.clear(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index d2148396de1..63840c97d9d 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -97,7 +97,18 @@ Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) { int64_t submit_task_time = MonotonicNanos(); auto task = MemtableFlushTask::create_shared( shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time); - Status ret = _thread_pool->submit(std::move(task)); + + // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task. + // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished, + // but not consider whether load channel is finish. + std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock(); + ThreadPool* wg_thread_pool = nullptr; + if (wg_sptr) { + wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr(); + } + Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task)) + : _thread_pool->submit(std::move(task)); + if (ret.ok()) { // _wait_running_task_finish was executed after this function, so no need to notify _cond here _stats.flush_running_count++; @@ -232,7 +243,8 @@ void MemTableFlushExecutor::init(int num_disk) { // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer, - bool is_high_priority) { + bool is_high_priority, + std::shared_ptr<WorkloadGroup> wg_sptr) { switch (rowset_writer->type()) { case ALPHA_ROWSET: // alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now. @@ -240,7 +252,7 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl case BETA_ROWSET: { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); - flush_token = FlushToken::create_shared(pool); + flush_token = FlushToken::create_shared(pool, wg_sptr); flush_token->set_rowset_writer(rowset_writer); return Status::OK(); } @@ -249,16 +261,4 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl } } -Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, - ThreadPool* wg_flush_pool_ptr) { - if (rowset_writer->type() == BETA_ROWSET) { - flush_token = FlushToken::create_shared(wg_flush_pool_ptr); - } else { - return Status::InternalError<false>("not support alpha rowset load now."); - } - flush_token->set_rowset_writer(rowset_writer); - return Status::OK(); -} - } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 48fc3eafc22..753f1106646 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -34,6 +34,7 @@ namespace doris { class DataDir; class MemTable; class RowsetWriter; +class WorkloadGroup; // the statistic of a certain flush handler. // use atomic because it may be updated by multi threads @@ -59,7 +60,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> { ENABLE_FACTORY_CREATOR(FlushToken); public: - FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {} + FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr) + : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {} Status submit(std::shared_ptr<MemTable> mem_table); @@ -108,6 +110,8 @@ private: std::mutex _mutex; std::condition_variable _cond; + + std::weak_ptr<WorkloadGroup> _wg_wptr; }; // MemTableFlushExecutor is responsible for flushing memtables to disk. @@ -132,11 +136,8 @@ public: void init(int num_disk); Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority); - - Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, - ThreadPool* wg_flush_pool_ptr); + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority, + std::shared_ptr<WorkloadGroup> wg_sptr); private: std::unique_ptr<ThreadPool> _flush_pool; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 0710d8c4071..4f5ff74a839 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -61,7 +61,7 @@ MemTableWriter::~MemTableWriter() { Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) { + std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; @@ -73,19 +73,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // create flush handler // by assigning segment_id to memtable before submiting to flush executor, // we can make sure same keys sort in the same order in all replicas. - if (wg_flush_pool_ptr) { - RETURN_IF_ERROR( - ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr)); - } else { - RETURN_IF_ERROR( - ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority)); - } + RETURN_IF_ERROR( + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr)); _is_init = true; return Status::OK(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index 1a1a3e3af2c..ca47c113bfe 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -47,6 +47,7 @@ class SlotDescriptor; class OlapTableSchemaParam; class RowsetWriter; struct FlushStatistic; +class WorkloadGroup; namespace vectorized { class Block; @@ -62,7 +63,7 @@ public: Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false); + std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index a89ccc461bd..ce470cf26d7 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -263,7 +263,7 @@ private: // And will be shared by all instances of this query. // So that we can control the max thread that a query can be used to execute. // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. - std::unique_ptr<ThreadPoolToken> _thread_token; + std::unique_ptr<ThreadPoolToken> _thread_token {nullptr}; void _init_query_mem_tracker(); diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 7846b2fba52..b4d1406a94d 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -200,6 +200,17 @@ public: friend class WorkloadGroupMetrics; + CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() { + std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); + return _cgroup_cpu_ctl.get(); + } + + ThreadPool* get_memtable_flush_pool_ptr() { + // no lock here because this is called by memtable flush, + // to avoid lock competition with the workload thread pool's update + return _memtable_flush_pool.get(); + } + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 0cc37e3458b..561be8ef3ad 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -107,6 +107,17 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } + if (state && state->get_query_ctx()) { + WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group(); + if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) { + Status ret = wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup(); + if (ret.ok()) { + std::string wg_tname = "asyc_wr_" + wg_ptr->name(); + Thread::set_self_name(wg_tname); + } + } + } + DCHECK(_dependency); while (_writer_status.ok()) { ThreadCpuStopWatch cpu_time_stop_watch; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org