This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 73199122c2d [enhancement](compaction) Control the parallelism for urgent compacton tasks (#37782) (#38189) 73199122c2d is described below commit 73199122c2dd43643bfe24010f3c24fb137ad004 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Mon Jul 22 17:22:53 2024 +0800 [enhancement](compaction) Control the parallelism for urgent compacton tasks (#37782) (#38189) ## Proposed changes For some urgent compaction tasks, their submittion should take parallelism into account. Currently, we apply the control policy for data loading in specific. Other source of urgent tasks are considered as eager. --- be/src/agent/task_worker_pool.cpp | 3 ++- be/src/olap/olap_server.cpp | 41 +++++++++++++++++++++++++++++++++------ be/src/olap/storage_engine.h | 2 +- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9d222114e0..59971f01a4d 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -40,6 +40,7 @@ #include <sstream> #include <string> #include <thread> +#include <type_traits> #include <utility> #include <vector> @@ -1586,7 +1587,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) && published_count % 20 == 0) { auto st = _engine.submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, true); + tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id << ", published=" << published_count << " : " << st; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 038a5f2cd45..173fc227892 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num( return num; } -bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type, - bool all_base) { - if (count >= thread_per_disk) { +bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk, + CompactionType compaction_type, bool all_base) { + if (task_cnt_per_disk >= thread_per_disk) { // Return if no available slot return false; - } else if (count >= thread_per_disk - 1) { + } else if (task_cnt_per_disk >= thread_per_disk - 1) { // Only one slot left, check if it can be assigned to base compaction task. if (compaction_type == CompactionType::BASE_COMPACTION) { if (all_base) { @@ -912,7 +912,7 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( copied_cumu_map = _tablet_submitted_cumu_compaction; copied_base_map = _tablet_submitted_base_compaction; } - for (auto data_dir : data_dirs) { + for (auto* data_dir : data_dirs) { bool need_pick_tablet = true; // We need to reserve at least one Slot for cumulative compaction. // So when there is only one Slot, we have to judge whether there is a cumulative compaction @@ -1091,7 +1091,36 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force) { + bool force, bool eager) { + if (!eager) { + DCHECK(compaction_type == CompactionType::BASE_COMPACTION || + compaction_type == CompactionType::CUMULATIVE_COMPACTION); + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map; + { + std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex); + copied_cumu_map = _tablet_submitted_cumu_compaction; + copied_base_map = _tablet_submitted_base_compaction; + } + auto stores = get_stores(); + + auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type, + this](auto* data_dir) { + int count = _get_executing_compaction_num(copied_base_map[data_dir]) + + _get_executing_compaction_num(copied_cumu_map[data_dir]); + int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk + : config::compaction_task_num_per_disk; + bool all_base = copied_cumu_map[data_dir].empty(); + return need_generate_compaction_tasks(count, paral, compaction_type, all_base); + }; + + bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred); + if (is_busy) { + LOG_EVERY_N(WARNING, 100) + << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id(); + return Status::OK(); + } + } _update_cumulative_compaction_policy(); // alter table tableName set ("compaction_policy"="time_series") // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 99e92828a0b..f647869e825 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -195,7 +195,7 @@ public: void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force); + bool force, bool eager = true); Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker, SegCompactionCandidatesSharedPtr segments); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org