This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 63c5625b783 [Improvement]Add more load cpu usage to workload group (#42053) 63c5625b783 is described below commit 63c5625b783d3312fce59e997115b9de8367c808 Author: wangbo <wan...@apache.org> AuthorDate: Wed Oct 23 10:54:29 2024 +0800 [Improvement]Add more load cpu usage to workload group (#42053) ## Proposed changes Add more workload cpu usage to workload group. 1 AsyncResultWriter's cpu usage. 2 Memtable flush's cpu usage when memtable is not on sink side. --- be/src/olap/delta_writer.cpp | 7 ++++++- be/src/olap/delta_writer_v2.cpp | 7 +++---- be/src/olap/memtable_flush_executor.cpp | 28 ++++++++++++-------------- be/src/olap/memtable_flush_executor.h | 13 ++++++------ be/src/olap/memtable_writer.cpp | 18 ++++------------- be/src/olap/memtable_writer.h | 3 ++- be/src/runtime/query_context.h | 2 +- be/src/runtime/workload_group/workload_group.h | 11 ++++++++++ be/src/vec/sink/writer/async_result_writer.cpp | 11 ++++++++++ 9 files changed, 58 insertions(+), 42 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 00c622df59f..e0e3a5281bc 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -103,10 +103,15 @@ Status BaseDeltaWriter::init() { if (_is_init) { return Status::OK(); } + auto* t_ctx = doris::thread_context(true); + std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; + if (t_ctx) { + wg_sptr = t_ctx->workload_group().lock(); + } RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_memtable_writer->init( _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(), - _rowset_builder->get_partial_update_info(), nullptr, + _rowset_builder->get_partial_update_info(), wg_sptr, _rowset_builder->tablet()->enable_unique_key_merge_on_write())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 0e83de2ca17..a6fb0154489 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -127,13 +127,12 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - ThreadPool* wg_thread_pool_ptr = nullptr; + std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; if (_state->get_query_ctx()) { - wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool(); + wg_sptr = _state->get_query_ctx()->workload_group(); } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, - wg_thread_pool_ptr, - _streams[0]->enable_unique_mow(_req.index_id))); + wg_sptr, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; _streams.clear(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index dc911647be8..5cdb45281b9 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -100,7 +100,16 @@ Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) { int64_t submit_task_time = MonotonicNanos(); auto task = MemtableFlushTask::create_shared( shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time); - Status ret = _thread_pool->submit(std::move(task)); + // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task. + // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished, + // but not consider whether load channel is finish. + std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock(); + ThreadPool* wg_thread_pool = nullptr; + if (wg_sptr) { + wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr(); + } + Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task)) + : _thread_pool->submit(std::move(task)); if (ret.ok()) { // _wait_running_task_finish was executed after this function, so no need to notify _cond here _stats.flush_running_count++; @@ -236,7 +245,8 @@ void MemTableFlushExecutor::init(int num_disk) { // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer, - bool is_high_priority) { + bool is_high_priority, + std::shared_ptr<WorkloadGroup> wg_sptr) { switch (rowset_writer->type()) { case ALPHA_ROWSET: // alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now. @@ -244,7 +254,7 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl case BETA_ROWSET: { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); - flush_token = FlushToken::create_shared(pool); + flush_token = FlushToken::create_shared(pool, wg_sptr); flush_token->set_rowset_writer(rowset_writer); return Status::OK(); } @@ -253,18 +263,6 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl } } -Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, - ThreadPool* wg_flush_pool_ptr) { - if (rowset_writer->type() == BETA_ROWSET) { - flush_token = FlushToken::create_shared(wg_flush_pool_ptr); - } else { - return Status::InternalError<false>("not support alpha rowset load now."); - } - flush_token->set_rowset_writer(rowset_writer); - return Status::OK(); -} - void MemTableFlushExecutor::_register_metrics() { REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, [this]() { return _flush_pool->get_queue_size(); }); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 25c5a37afba..27e8e8a9b0e 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -34,6 +34,7 @@ namespace doris { class DataDir; class MemTable; class RowsetWriter; +class WorkloadGroup; // the statistic of a certain flush handler. // use atomic because it may be updated by multi threads @@ -59,7 +60,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> { ENABLE_FACTORY_CREATOR(FlushToken); public: - FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {} + FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr) + : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {} Status submit(std::shared_ptr<MemTable> mem_table); @@ -108,6 +110,8 @@ private: std::mutex _mutex; std::condition_variable _cond; + + std::weak_ptr<WorkloadGroup> _wg_wptr; }; // MemTableFlushExecutor is responsible for flushing memtables to disk. @@ -133,11 +137,8 @@ public: void init(int num_disk); Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority); - - Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, - std::shared_ptr<RowsetWriter> rowset_writer, - ThreadPool* wg_flush_pool_ptr); + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority, + std::shared_ptr<WorkloadGroup> wg_sptr); private: void _register_metrics(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index e8123c48ecc..88532646b66 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() { Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) { + std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; @@ -77,19 +77,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // create flush handler // by assigning segment_id to memtable before submiting to flush executor, // we can make sure same keys sort in the same order in all replicas. - if (wg_flush_pool_ptr) { - RETURN_IF_ERROR( - ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr)); - } else { - RETURN_IF_ERROR( - ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority)); - } + RETURN_IF_ERROR( + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr)); _is_init = true; return Status::OK(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index ec44348b4a9..fb07e740fa3 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -52,6 +52,7 @@ class SlotDescriptor; class OlapTableSchemaParam; class RowsetWriter; struct FlushStatistic; +class WorkloadGroup; namespace vectorized { class Block; @@ -67,7 +68,7 @@ public: Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false); + std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 9d499f3487e..1a05b784d5b 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -255,7 +255,7 @@ private: // And will be shared by all instances of this query. // So that we can control the max thread that a query can be used to execute. // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. - std::unique_ptr<ThreadPoolToken> _thread_token; + std::unique_ptr<ThreadPoolToken> _thread_token {nullptr}; void _init_query_mem_tracker(); diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 933c5afdb4e..2ba84ce982b 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -198,6 +198,17 @@ public: } int64_t get_remote_scan_bytes_per_second(); + CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() { + std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); + return _cgroup_cpu_ctl.get(); + } + + ThreadPool* get_memtable_flush_pool_ptr() { + // no lock here because this is called by memtable flush, + // to avoid lock competition with the workload thread pool's update + return _memtable_flush_pool.get(); + } + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 16dcbc648fb..432ec1c54b5 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -107,6 +107,17 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } + if (state && state->get_query_ctx()) { + WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group(); + if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) { + Status ret = wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup(); + if (ret.ok()) { + std::string wg_tname = "asyc_wr_" + wg_ptr->name(); + Thread::set_self_name(wg_tname); + } + } + } + DCHECK(_dependency); if (_writer_status.ok()) { while (true) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org