This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 63440838483 [cherry-pick](branch-3.0) Pick "[Enhancement](compaction) Try get global lock when execute compaction (#49882)" (#50432) 63440838483 is described below commit 6344083848322ba7bccc88fdd6b3b2780ba6695d Author: abmdocrt <lianyuk...@selectdb.com> AuthorDate: Sun Apr 27 17:57:37 2025 +0800 [cherry-pick](branch-3.0) Pick "[Enhancement](compaction) Try get global lock when execute compaction (#49882)" (#50432) Pick #49882 Background: In cloud mode, compaction tasks for the same tablet may be scheduled across multiple BEs. To ensure that only one BE can execute a compaction task for a given tablet at a time, a global locking mechanism is used. During compaction preparation, tablet and compaction information is written as key-value pairs to the metadata service. A background thread periodically renews the lease. Other BEs can only perform compaction on a tablet when the KV entry has expired or doesn't exist, ensuring that a tablet's compaction occurs on only one BE at a time. Problem: Compaction tasks are processed through a thread pool. Currently, we first prepare compaction and acquire the global lock before queueing the task. If a BE is under heavy compaction pressure with all threads occupied, tablets may wait in the queue for extended periods. Meanwhile, other idle BEs cannot perform compaction on these tablets because they cannot acquire the global lock, leading to resource imbalance with some BEs starved and others overloaded. Solution: To address this issue, we'll modify the workflow to queue tasks first, then attempt to acquire the lock only when the task is about to be executed. This ensures that even if a tablet's compaction task is queued on one BE, another idle BE can still perform compaction on that tablet, resulting in better resource utilization across the cluster. --- be/src/cloud/cloud_base_compaction.cpp | 38 +++--- be/src/cloud/cloud_base_compaction.h | 1 + be/src/cloud/cloud_cumulative_compaction.cpp | 73 +++++------ be/src/cloud/cloud_cumulative_compaction.h | 1 + be/src/cloud/cloud_full_compaction.cpp | 37 +++--- be/src/cloud/cloud_full_compaction.h | 1 + be/src/cloud/cloud_storage_engine.cpp | 132 ++++++++++++++++++-- be/src/cloud/cloud_storage_engine.h | 11 ++ .../cloud/test_cloud_compaction_global_lock.groovy | 135 +++++++++++++++++++++ 9 files changed, 338 insertions(+), 91 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index fc6b13b564b..5af1a575700 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -64,6 +64,26 @@ Status CloudBaseCompaction::prepare_compact() { RETURN_IF_ERROR(pick_rowsets_to_compact()); + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size); + return Status::OK(); +} + +Status CloudBaseCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -113,25 +133,7 @@ Status CloudBaseCompaction::prepare_compact() { LOG(WARNING) << msg; return Status::InternalError(msg); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size); return st; } diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index b9f52922b8e..63bb5c61def 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -32,6 +32,7 @@ public: Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 5b6003fc599..5d6a519aa5a 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -62,9 +62,6 @@ Status CloudCumulativeCompaction::prepare_compact() { } } - int tried = 0; -PREPARE_TRY_AGAIN: - bool need_sync_tablet = true; { std::shared_lock rlock(_tablet->get_header_lock()); @@ -83,7 +80,7 @@ PREPARE_TRY_AGAIN: // pick rowsets to compact auto st = pick_rowsets_to_compact(); if (!st.ok()) { - if (tried == 0 && _last_delete_version.first != -1) { + if (_last_delete_version.first != -1) { // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. // plus 1 to skip the delete version. // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. @@ -96,6 +93,30 @@ PREPARE_TRY_AGAIN: return st; } + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size) + .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + return st; +} + +Status CloudCumulativeCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -121,7 +142,7 @@ PREPARE_TRY_AGAIN: // Set input version range to let meta-service check version range conflict compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction); cloud::StartTabletJobResponse resp; - st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + Status st = _engine.meta_mgr().prepare_tablet_job(job, &resp); if (!st.ok()) { if (resp.status().code() == cloud::STALE_TABLET_CACHE) { // set last_sync_time to 0 to force sync tablet next time @@ -130,22 +151,10 @@ PREPARE_TRY_AGAIN: // tablet not found cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) { - if (config::enable_parallel_cumu_compaction && resp.version_in_compaction_size() > 0 && - ++tried <= 2) { - _max_conflict_version = *std::max_element(resp.version_in_compaction().begin(), - resp.version_in_compaction().end()); - LOG_INFO("retry pick input rowsets") - .tag("job_id", _uuid) - .tag("max_conflict_version", _max_conflict_version) - .tag("tried", tried) - .tag("msg", resp.status().msg()); - goto PREPARE_TRY_AGAIN; - } else { - LOG_WARNING("failed to prepare cumu compaction") - .tag("job_id", _uuid) - .tag("msg", resp.status().msg()); - return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions"); - } + LOG_WARNING("failed to prepare cumu compaction") + .tag("job_id", _uuid) + .tag("msg", resp.status().msg()); + return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions"); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { (static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version()); std::stringstream ss; @@ -159,29 +168,7 @@ PREPARE_TRY_AGAIN: LOG(WARNING) << msg; return Status::InternalError(msg); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size) - .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) - .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) - .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) - .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); return st; } diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 1d445648a3d..ba3b768a24f 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -34,6 +34,7 @@ public: Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 211ed5c2458..0b6810a414c 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -56,6 +56,25 @@ Status CloudFullCompaction::prepare_compact() { RETURN_IF_ERROR(pick_rowsets_to_compact()); + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size); + return Status::OK(); +} +Status CloudFullCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -88,25 +107,7 @@ Status CloudFullCompaction::prepare_compact() { // tablet not found cloud_tablet()->clear_cache(); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size); return st; } diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index b44e1503800..1cdf52472c0 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -33,6 +33,7 @@ public: Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 0af0051af7c..f74dc4230a7 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -17,6 +17,7 @@ #include "cloud/cloud_storage_engine.h" +#include <bvar/reducer.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/cloud.pb.h> #include <gen_cpp/olap_file.pb.h> @@ -26,6 +27,7 @@ #include <rapidjson/stringbuffer.h> #include <algorithm> +#include <memory> #include <variant> #include "cloud/cloud_base_compaction.h" @@ -49,18 +51,25 @@ #include "io/fs/hdfs_file_system.h" #include "io/fs/s3_file_system.h" #include "io/hdfs_util.h" +#include "io/io_common.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/memtable_flush_executor.h" #include "olap/storage_policy.h" #include "runtime/memory/cache_manager.h" #include "util/parse_util.h" +#include "vec/common/assert_cast.h" namespace doris { #include "common/compile_check_begin.h" using namespace std::literals; +bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count"); +bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count"); +bvar::Adder<uint64_t> g_cumu_compaction_running_task_count( + "cumulative_compaction_running_task_count"); + int get_cumu_thread_num() { if (config::max_cumu_compaction_threads > 0) { return config::max_cumu_compaction_threads; @@ -585,6 +594,61 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task return tablets_compaction; } +Status CloudStorageEngine::_request_tablet_global_compaction_lock( + ReaderType compaction_type, const CloudTabletSPtr& tablet, + std::shared_ptr<CloudCompactionMixin> compaction) { + long now = duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { + auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction); + if (auto st = cumu_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request cumu compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_cumu_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction); + } + return Status::OK(); + } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { + auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction); + if (auto st = base_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request base compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_base_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_base_compactions[tablet->tablet_id()] = base_compaction; + } + return Status::OK(); + } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) { + auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction); + if (auto st = full_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request full compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_full_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_full_compactions[tablet->tablet_id()] = full_compaction; + } + return Status::OK(); + } else { + LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id() + << ", compaction name: " << compaction->compaction_name(); + return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name()); + } +} + Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { using namespace std::chrono; { @@ -600,7 +664,9 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet); auto st = compaction->prepare_compact(); if (!st.ok()) { - long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + long now = duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); tablet->set_last_base_compaction_failure_time(now); std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); @@ -611,15 +677,23 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t _submitted_base_compactions[tablet->tablet_id()] = compaction; } st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + g_base_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - auto st = compaction->execute_compact(); + Defer defer {[&]() { + g_base_compaction_running_task_count << -1; + _submitted_base_compactions.erase(tablet->tablet_id()); + }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, + compaction); + if (!st.ok()) return; + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); tablet->set_last_base_compaction_failure_time(now); } std::lock_guard lock(_compaction_mtx); - _submitted_base_compactions.erase(tablet->tablet_id()); + _executing_base_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { std::lock_guard lock(_compaction_mtx); @@ -648,7 +722,9 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet); auto st = compaction->prepare_compact(); if (!st.ok()) { - long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + long now = duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() && st.msg() != "_last_delete_version.first not equal to -1") { // Backoff strategy if no suitable version @@ -680,8 +756,27 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS tablet->last_cumu_no_suitable_version_ms = 0; } }; + auto erase_executing_cumu_compaction = [=, this]() { + std::lock_guard lock(_compaction_mtx); + auto it = _executing_cumu_compactions.find(tablet->tablet_id()); + DCHECK(it != _executing_cumu_compactions.end()); + auto& compactions = it->second; + auto it1 = std::find(compactions.begin(), compactions.end(), compaction); + DCHECK(it1 != compactions.end()); + compactions.erase(it1); + if (compactions.empty()) { // No compactions on this tablet, erase key + _executing_cumu_compactions.erase(it); + // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to + // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform + // cumu compaction on tablet which has suitable versions for cumu compaction. + tablet->last_cumu_no_suitable_version_ms = 0; + } + }; st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line", + { sleep(5); }) signal::tablet_id = tablet->tablet_id(); + g_cumu_compaction_running_task_count << 1; bool is_large_task = true; Defer defer {[&]() { DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", @@ -691,7 +786,12 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS if (!is_large_task) { _cumu_compaction_thread_pool_small_tasks_running--; } + g_cumu_compaction_running_task_count << -1; + erase_submitted_cumu_compaction(); }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, + tablet, compaction); + if (!st.ok()) return; do { std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads++; @@ -713,7 +813,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()) .count(); tablet->set_last_cumu_compaction_failure_time(now); - erase_submitted_cumu_compaction(); + erase_executing_cumu_compaction(); // sleep 5s for this tablet tablet->last_cumu_no_suitable_version_ms = now; LOG_WARNING( @@ -733,13 +833,13 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS } } } while (false); - auto st = compaction->execute_compact(); + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); tablet->set_last_cumu_compaction_failure_time(now); } - erase_submitted_cumu_compaction(); + erase_executing_cumu_compaction(); }); if (!st.ok()) { erase_submitted_cumu_compaction(); @@ -776,15 +876,23 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t _submitted_full_compactions[tablet->tablet_id()] = compaction; } st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + g_full_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - auto st = compaction->execute_compact(); + Defer defer {[&]() { + g_full_compaction_running_task_count << -1; + _submitted_full_compactions.erase(tablet->tablet_id()); + }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, + compaction); + if (!st.ok()) return; + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); tablet->set_last_full_compaction_failure_time(now); } std::lock_guard lock(_compaction_mtx); - _submitted_full_compactions.erase(tablet->tablet_id()); + _executing_full_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { std::lock_guard lock(_compaction_mtx); @@ -824,17 +932,17 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens; { std::lock_guard lock(_compaction_mtx); - for (auto& [_, base] : _submitted_base_compactions) { + for (auto& [_, base] : _executing_base_compactions) { if (base) { // `base` might be a nullptr placeholder base_compactions.push_back(base); } } - for (auto& [_, cumus] : _submitted_cumu_compactions) { + for (auto& [_, cumus] : _executing_cumu_compactions) { for (auto& cumu : cumus) { cumu_compactions.push_back(cumu); } } - for (auto& [_, full] : _submitted_full_compactions) { + for (auto& [_, full] : _executing_full_compactions) { if (full) { full_compactions.push_back(full); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index ccc31990068..12f9bb8435c 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -27,6 +27,7 @@ #include "cloud/schema_cloud_dictionary_cache.h" #include "cloud_txn_delete_bitmap_cache.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/compaction.h" #include "olap/storage_engine.h" #include "olap/storage_policy.h" #include "util/threadpool.h" @@ -164,6 +165,9 @@ private: Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); + Status _request_tablet_global_compaction_lock(ReaderType compaction_type, + const CloudTabletSPtr& tablet, + std::shared_ptr<CloudCompactionMixin> compaction); void _lease_compaction_thread_callback(); void _check_tablet_delete_bitmap_score_callback(); @@ -203,6 +207,13 @@ private: // tablet_id -> active compaction stop tokens std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>> _active_compaction_stop_tokens; + // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx` + std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> + _executing_cumu_compactions; + // tablet_id -> executing base compactions, guarded by `_compaction_mtx` + std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> _executing_base_compactions; + // tablet_id -> executing full compactions, guarded by `_compaction_mtx` + std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> _executing_full_compactions; using CumuPolices = std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>; diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy new file mode 100644 index 00000000000..7070265a880 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_cloud_compaction_global_lock', 'docker') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "cumulative_compaction_min_deltas=2" ] + options.beConfigs += [ "cumulative_compaction_max_deltas=3" ] + options.beNum = 3 + docker(options) { + + def cumuInjectName = 'CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line' + def injectBe = null + def cumuNormalName = 'CloudStorageEngine._submit_cumulative_compaction_task.sleep' + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + def test_cumu_compaction_global_lock = { + def tableName = "test_cumu_compaction_global_lock" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (0,0)""" + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM test_cumu_compaction_global_lock") + def originTabletId = array[0].TabletId + def noramlOriginTabletId = array[0].TabletId + + sql """ select * from ${tableName} order by k""" + + Thread.sleep(5000) + + // inject be cu compaction + logger.info("run inject be cumu compaction:" + originTabletId) + def (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + Thread.sleep(1000) + + // normal be cu compaction + logger.info("run normal be cumu compaction:" + noramlOriginTabletId) + (code, out, err) = be_run_cumulative_compaction(normalBe.Host, normalBe.HttpPort, noramlOriginTabletId) + logger.info("Run normal be cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + Thread.sleep(1000) + + // check rowsets + logger.info("run inject be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + Thread.sleep(10000) + + // check rowsets + logger.info("run normal be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(normalBe.Host, normalBe.HttpPort, noramlOriginTabletId) + logger.info("Run normal be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + // check rowsets + logger.info("run inject be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + } + + try { + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, cumuInjectName) + DebugPoint.enableDebugPoint(normalBe.Host, normalBe.HttpPort.toInteger(), NodeType.BE, cumuNormalName) + + test_cumu_compaction_global_lock() + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, cumuInjectName) + DebugPoint.disableDebugPoint(normalBe.Host, normalBe.HttpPort.toInteger(), NodeType.BE, cumuNormalName) + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org