This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a7316437604 [Fix](cloud-mow) Compaciton should release delete bitmap lock when abort fail (#47963) a7316437604 is described below commit a7316437604c6d62d7810180ffe265932c8254b7 Author: huanghaibin <huanghai...@selectdb.com> AuthorDate: Sat Feb 22 14:21:22 2025 +0800 [Fix](cloud-mow) Compaciton should release delete bitmap lock when abort fail (#47963) pick pr #47766 --- be/src/cloud/cloud_base_compaction.cpp | 19 +- be/src/cloud/cloud_base_compaction.h | 3 +- be/src/cloud/cloud_cumulative_compaction.cpp | 33 ++-- be/src/cloud/cloud_cumulative_compaction.h | 3 +- be/src/cloud/cloud_full_compaction.cpp | 20 +-- be/src/cloud/cloud_full_compaction.h | 3 +- be/src/cloud/cloud_meta_mgr.cpp | 17 +- be/src/cloud/cloud_meta_mgr.h | 4 +- be/src/cloud/cloud_tablet.cpp | 2 +- be/src/cloud/config.cpp | 2 +- be/src/cloud/config.h | 2 +- be/src/olap/compaction.cpp | 31 +++- be/src/olap/compaction.h | 10 +- cloud/src/meta-service/meta_service.cpp | 71 ++++++-- cloud/src/meta-service/meta_service_job.cpp | 4 +- .../test_compaction_fail_release_lock.out | Bin 0 -> 230 bytes .../test_compaction_fail_release_lock.groovy | 198 +++++++++++++++++++++ 17 files changed, 336 insertions(+), 86 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 3db54269cc3..fc6b13b564b 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -38,12 +38,7 @@ bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudBaseCompaction::~CloudBaseCompaction() = default; @@ -330,8 +325,7 @@ Status CloudBaseCompaction::modify_rowsets() { DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits<int64_t>::max(); + int64_t initiator = this->initiator(); RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( _input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(), _stats.merged_rows, initiator, output_rowset_delete_bitmap, @@ -403,8 +397,8 @@ Status CloudBaseCompaction::modify_rowsets() { return Status::OK(); } -void CloudBaseCompaction::garbage_collection() { - CloudCompactionMixin::garbage_collection(); +Status CloudBaseCompaction::garbage_collection() { + RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection()); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); idx->set_tablet_id(_tablet->tablet_id()); @@ -418,9 +412,7 @@ void CloudBaseCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits<int64_t>::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { @@ -429,6 +421,7 @@ void CloudBaseCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } void CloudBaseCompaction::do_lease() { diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index 4240458f21b..b9f52922b8e 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -42,7 +42,7 @@ private: Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; void _filter_input_rowset(); @@ -50,7 +50,6 @@ private: ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } - std::string _uuid; int64_t _input_segments = 0; int64_t _base_compaction_cnt = 0; int64_t _cumulative_compaction_cnt = 0; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 7594ac84684..8addf7586f7 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -41,12 +41,7 @@ bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size"); CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; @@ -284,8 +279,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { }); DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; - int64_t initiator = - HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max(); + int64_t initiator = this->initiator(); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( @@ -305,6 +299,13 @@ Status CloudCumulativeCompaction::modify_rowsets() { compaction_job->set_delete_bitmap_lock_initiator(initiator); } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", { + LOG(INFO) << "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id" + << cloud_tablet()->tablet_id(); + return Status::InternalError( + "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id {}", + cloud_tablet()->tablet_id()); + }); cloud::FinishTabletJobResponse resp; auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); if (resp.has_alter_version()) { @@ -440,8 +441,8 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { return Status::OK(); } -void CloudCumulativeCompaction::garbage_collection() { - CloudCompactionMixin::garbage_collection(); +Status CloudCumulativeCompaction::garbage_collection() { + RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection()); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); idx->set_tablet_id(_tablet->tablet_id()); @@ -455,10 +456,15 @@ void CloudCumulativeCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & - std::numeric_limits<int64_t>::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", { + LOG(INFO) << "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id" + << cloud_tablet()->tablet_id(); + return Status::InternalError( + "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id {}", + cloud_tablet()->tablet_id()); + }); auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { LOG_WARNING("failed to abort compaction job") @@ -466,6 +472,7 @@ void CloudCumulativeCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } Status CloudCumulativeCompaction::pick_rowsets_to_compact() { diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 87fc0b62c9c..1e50cd93215 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -44,7 +44,7 @@ private: Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; void update_cumulative_point(); @@ -52,7 +52,6 @@ private: ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } - std::string _uuid; int64_t _input_segments = 0; int64_t _max_conflict_version = 0; // Snapshot values when pick input rowsets diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index f983e57ebe0..b7dde4266df 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -42,12 +42,7 @@ bvar::Adder<uint64_t> full_output_size("full_compaction", "output_size"); CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) : CloudCompactionMixin(engine, tablet, - "BaseCompaction:" + std::to_string(tablet->tablet_id())) { - auto uuid = UUIDGenerator::instance()->next_uuid(); - std::stringstream ss; - ss << uuid; - _uuid = ss.str(); -} + "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} CloudFullCompaction::~CloudFullCompaction() = default; @@ -227,10 +222,8 @@ Status CloudFullCompaction::modify_rowsets() { DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = - boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max(); - RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(initiator)); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator())); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } cloud::FinishTabletJobResponse resp; @@ -271,7 +264,7 @@ Status CloudFullCompaction::modify_rowsets() { return Status::OK(); } -void CloudFullCompaction::garbage_collection() { +Status CloudFullCompaction::garbage_collection() { //file_cache_garbage_collection(); cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -286,9 +279,7 @@ void CloudFullCompaction::garbage_collection() { compaction_job->set_type(cloud::TabletCompactionJobPB::FULL); if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - int64_t initiator = - boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max(); - compaction_job->set_delete_bitmap_lock_initiator(initiator); + compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); } auto st = _engine.meta_mgr().abort_tablet_job(job); if (!st.ok()) { @@ -297,6 +288,7 @@ void CloudFullCompaction::garbage_collection() { .tag("tablet_id", _tablet->tablet_id()) .error(st); } + return st; } void CloudFullCompaction::do_lease() { diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index 3cbc353f6dd..882fd752926 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -42,7 +42,7 @@ protected: std::string_view compaction_name() const override { return "CloudFullCompaction"; } Status modify_rowsets() override; - void garbage_collection() override; + Status garbage_collection() override; private: Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator); @@ -52,7 +52,6 @@ private: ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } - std::string _uuid; int64_t _input_segments = 0; // Snapshot values when pick input rowsets int64_t _base_compaction_cnt = 0; diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 7922ac633fd..1b8a91abe3e 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1219,23 +1219,24 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in return st; } -Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, - int64_t initiator) { - VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() - << ",lock_id:" << lock_id; +void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, + int64_t initiator, int64_t tablet_id) { + LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id + << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id; RemoveDeleteBitmapUpdateLockRequest req; RemoveDeleteBitmapUpdateLockResponse res; req.set_cloud_unique_id(config::cloud_unique_id); - req.set_tablet_id(tablet.tablet_id()); + req.set_table_id(table_id); + req.set_tablet_id(tablet_id); req.set_lock_id(lock_id); req.set_initiator(initiator); auto st = retry_rpc("remove delete bitmap update lock", req, &res, &MetaService_Stub::remove_delete_bitmap_update_lock); if (!st.ok()) { - LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id() - << " lock_id=" << lock_id << " st=" << st.to_string(); + LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id + << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id + << ",st=" << st.to_string(); } - return st; } Status CloudMetaMgr::remove_old_version_delete_bitmap( diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 913ef59489a..f0a1b1a6648 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -103,8 +103,8 @@ public: Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator); - Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, - int64_t initiator); + void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator, + int64_t tablet_id); Status remove_old_version_delete_bitmap( int64_t tablet_id, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 1642fd9efd5..333fd281f13 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -36,6 +36,7 @@ #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" @@ -54,7 +55,6 @@ namespace doris { #include "common/compile_check_begin.h" using namespace ErrorCode; -static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; static constexpr int LOAD_INITIATOR_ID = -1; CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 59a3c59b5d3..4dee3002e3a 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -66,7 +66,7 @@ DEFINE_mInt32(sync_load_for_tablets_thread, "32"); DEFINE_mBool(enable_new_tablet_do_compaction, "false"); -DEFINE_Int32(delete_bitmap_lock_expiration_seconds, "10"); +DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10"); DEFINE_Bool(enable_cloud_txn_lazy_commit, "false"); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 50f058bf8b0..f79038662ef 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -98,7 +98,7 @@ DECLARE_mBool(save_load_error_log_to_s3); // the theads which sync the datas which loaded in other clusters DECLARE_mInt32(sync_load_for_tablets_thread); -DECLARE_Int32(delete_bitmap_lock_expiration_seconds); +DECLARE_mInt32(delete_bitmap_lock_expiration_seconds); // enable large txn lazy commit in meta-service `commit_txn` DECLARE_mBool(enable_cloud_txn_lazy_commit); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 1bc7fbfa2a3..ff02cdf174c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1266,7 +1266,12 @@ int64_t CloudCompactionMixin::get_compaction_permits() { CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabletSPtr tablet, const std::string& label) - : Compaction(tablet, label), _engine(engine) {} + : Compaction(tablet, label), _engine(engine) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; @@ -1298,11 +1303,26 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { return Status::OK(); } +int64_t CloudCompactionMixin::initiator() const { + return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max(); +} + Status CloudCompactionMixin::execute_compact() { TEST_INJECTION_POINT("Compaction::do_compaction"); int64_t permits = get_compaction_permits(); - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits), - [&](const doris::Exception& ex) { garbage_collection(); }); + HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( + execute_compact_impl(permits), [&](const doris::Exception& ex) { + auto st = garbage_collection(); + if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) { + // if compaction fail, be will try to abort compaction, and delete bitmap lock + // will release if abort job successfully, but if abort failed, delete bitmap + // lock will not release, in this situation, be need to send this rpc to ms + // to try to release delete bitmap lock. + _engine.meta_mgr().remove_delete_bitmap_update_lock( + _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), + _tablet->tablet_id()); + } + }); _load_segment_to_cache(); return Status::OK(); } @@ -1352,9 +1372,9 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& return Status::OK(); } -void CloudCompactionMixin::garbage_collection() { +Status CloudCompactionMixin::garbage_collection() { if (!config::enable_file_cache) { - return; + return Status::OK(); } if (_output_rs_writer) { auto* beta_rowset_writer = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get()); @@ -1365,6 +1385,7 @@ void CloudCompactionMixin::garbage_collection() { file_cache->remove_if_cached_async(file_key); } } + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index b69e5ad8f45..47aa49aa57e 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -41,6 +41,8 @@ struct RowsetWriterContext; class StorageEngine; class CloudStorageEngine; +static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; +static constexpr int64_t INVALID_COMPACTION_INITIATOR_ID = -100; // This class is a base class for compaction. // The entrance of this class is compact() // Any compaction should go through four procedures. @@ -142,6 +144,8 @@ public: int64_t get_compaction_permits(); + int64_t initiator() const { return INVALID_COMPACTION_INITIATOR_ID; } + protected: // Convert `_tablet` from `BaseTablet` to `Tablet` Tablet* tablet(); @@ -176,13 +180,17 @@ public: Status execute_compact() override; + int64_t initiator() const; + protected: CloudTablet* cloud_tablet() { return static_cast<CloudTablet*>(_tablet.get()); } - virtual void garbage_collection(); + virtual Status garbage_collection(); CloudStorageEngine& _engine; + std::string _uuid; + int64_t _expiration = 0; private: diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index f0d4b9f4861..bc8af94496a 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1747,11 +1747,11 @@ void MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* contro } static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, std::stringstream& ss, - std::unique_ptr<Transaction>& txn, std::string& instance_id, - int64_t table_id, int64_t lock_id, int64_t lock_initiator) { - std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + std::unique_ptr<Transaction>& txn, int64_t table_id, + int64_t lock_id, int64_t lock_initiator, std::string& lock_key, + DeleteBitmapUpdateLockPB& lock_info) { std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; + LOG(INFO) << "check_delete_bitmap_lock, table_id=" << table_id << " key=" << hex(lock_key); auto err = txn->get(lock_key, &lock_val); TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", &err); if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { @@ -1865,8 +1865,10 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont bool unlock = request->has_unlock() ? request->unlock() : false; if (!unlock) { // 1. Check whether the lock expires - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), - request->initiator())) { + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; @@ -1961,8 +1963,11 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont return; } if (!unlock) { - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, - request->lock_id(), request->initiator())) { + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; @@ -2344,8 +2349,9 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms", request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), - request->initiator())) { + DeleteBitmapUpdateLockPB lock_info_tmp; + if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info_tmp)) { LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " code=" << code @@ -2381,17 +2387,48 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock( msg = "failed to init txn"; return; } - if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), - request->lock_id(), request->initiator())) { + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + if (!check_delete_bitmap_lock(code, msg, ss, txn, request->table_id(), request->lock_id(), + request->initiator(), lock_key, lock_info)) { LOG(WARNING) << "failed to check delete bitmap tablet lock" << " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id() << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " msg " << msg; return; } - std::string lock_key = - meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); - txn->remove(lock_key); + bool modify_initiators = false; + auto initiators = lock_info.mutable_initiators(); + for (auto iter = initiators->begin(); iter != initiators->end(); iter++) { + if (*iter == request->initiator()) { + initiators->erase(iter); + modify_initiators = true; + break; + } + } + if (!modify_initiators) { + LOG(INFO) << "initiators don't have initiator=" << request->initiator() + << ",initiators_size=" << lock_info.initiators_size() << ",just return"; + return; + } else if (initiators->empty()) { + LOG(INFO) << "remove delete bitmap lock, table_id=" << request->table_id() + << " lock_id=" << request->lock_id() << " key=" << hex(lock_key); + txn->remove(lock_key); + } else { + lock_info.SerializeToString(&lock_val); + if (lock_val.empty()) { + LOG(WARNING) << "failed to seiralize lock_info, table_id=" << request->table_id() + << " key=" << hex(lock_key); + return; + } + LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << request->table_id() + << ", key=" << hex(lock_key) << " lock_id=" << request->lock_id() + << " initiator=" << request->initiator() + << " initiators_size=" << lock_info.initiators_size(); + txn->put(lock_key, lock_val); + } err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as<ErrCategory::COMMIT>(err); @@ -2399,10 +2436,6 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock( msg = ss.str(); return; } - - LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id() - << " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id() - << ", key=" << hex(lock_key) << ", initiator=" << request->initiator(); } void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 173c6834b5f..5299b85f41d 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -458,7 +458,7 @@ static bool check_and_remove_delete_bitmap_update_lock(MetaServiceCode& code, st std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); std::string lock_val; TxnErrorCode err = txn->get(lock_key, &lock_val); - LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id + LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; if (err != TxnErrorCode::TXN_OK) { ss << "failed to get delete bitmap update lock key, instance_id=" << instance_id @@ -520,7 +520,7 @@ static void remove_delete_bitmap_update_lock(std::unique_ptr<Transaction>& txn, std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); std::string lock_val; TxnErrorCode err = txn->get(lock_key, &lock_val); - LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id + LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; if (err != TxnErrorCode::TXN_OK) { LOG(WARNING) << "failed to get delete bitmap update lock key, instance_id=" << instance_id diff --git a/regression-test/data/compaction/test_compaction_fail_release_lock.out b/regression-test/data/compaction/test_compaction_fail_release_lock.out new file mode 100644 index 00000000000..dcbfab2d353 Binary files /dev/null and b/regression-test/data/compaction/test_compaction_fail_release_lock.out differ diff --git a/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy new file mode 100644 index 00000000000..e8763446b9d --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compaction_fail_release_lock", "nonConcurrent") { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // store the original value + get_be_param("delete_bitmap_lock_expiration_seconds") + set_be_param("delete_bitmap_lock_expiration_seconds", "60") + + try { + def testTable = "test_compaction_fail_release_lock" + def timeout = 10000 + sql """ DROP TABLE IF EXISTS ${testTable}""" + def testTableDDL = """ + create table ${testTable} + ( + `plan_id` bigint(20) NOT NULL, + `target_id` int(20) NOT NULL, + `target_name` varchar(255) NOT NULL + ) + ENGINE=OLAP + UNIQUE KEY(`plan_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`plan_id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql testTableDDL + + sql "sync" + sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """ + + qt_sql "select * from ${testTable} order by plan_id" + + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed") + + // trigger cu compaction, compaction will commit fail + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + String trigger_backend_id = tablet.BackendId + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + } + + def now = System.currentTimeMillis() + + // insert will done before timeout + sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """ + + def time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") + + qt_sql "select * from ${testTable} order by plan_id" + + } finally { + reset_be_param("delete_bitmap_lock_expiration_seconds") + GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed") + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org