This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a7316437604 [Fix](cloud-mow) Compaciton should release delete bitmap 
lock when abort fail (#47963)
a7316437604 is described below

commit a7316437604c6d62d7810180ffe265932c8254b7
Author: huanghaibin <huanghai...@selectdb.com>
AuthorDate: Sat Feb 22 14:21:22 2025 +0800

    [Fix](cloud-mow) Compaciton should release delete bitmap lock when abort 
fail (#47963)
    
    pick pr #47766
---
 be/src/cloud/cloud_base_compaction.cpp             |  19 +-
 be/src/cloud/cloud_base_compaction.h               |   3 +-
 be/src/cloud/cloud_cumulative_compaction.cpp       |  33 ++--
 be/src/cloud/cloud_cumulative_compaction.h         |   3 +-
 be/src/cloud/cloud_full_compaction.cpp             |  20 +--
 be/src/cloud/cloud_full_compaction.h               |   3 +-
 be/src/cloud/cloud_meta_mgr.cpp                    |  17 +-
 be/src/cloud/cloud_meta_mgr.h                      |   4 +-
 be/src/cloud/cloud_tablet.cpp                      |   2 +-
 be/src/cloud/config.cpp                            |   2 +-
 be/src/cloud/config.h                              |   2 +-
 be/src/olap/compaction.cpp                         |  31 +++-
 be/src/olap/compaction.h                           |  10 +-
 cloud/src/meta-service/meta_service.cpp            |  71 ++++++--
 cloud/src/meta-service/meta_service_job.cpp        |   4 +-
 .../test_compaction_fail_release_lock.out          | Bin 0 -> 230 bytes
 .../test_compaction_fail_release_lock.groovy       | 198 +++++++++++++++++++++
 17 files changed, 336 insertions(+), 86 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index 3db54269cc3..fc6b13b564b 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -38,12 +38,7 @@ bvar::Adder<uint64_t> base_output_size("base_compaction", 
"output_size");
 
 CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, 
CloudTabletSPtr tablet)
         : CloudCompactionMixin(engine, tablet,
-                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
-    auto uuid = UUIDGenerator::instance()->next_uuid();
-    std::stringstream ss;
-    ss << uuid;
-    _uuid = ss.str();
-}
+                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
 CloudBaseCompaction::~CloudBaseCompaction() = default;
 
@@ -330,8 +325,7 @@ Status CloudBaseCompaction::modify_rowsets() {
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
-                            std::numeric_limits<int64_t>::max();
+        int64_t initiator = this->initiator();
         RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
                 _input_rowsets, _output_rowset, *_rowid_conversion, 
compaction_type(),
                 _stats.merged_rows, initiator, output_rowset_delete_bitmap,
@@ -403,8 +397,8 @@ Status CloudBaseCompaction::modify_rowsets() {
     return Status::OK();
 }
 
-void CloudBaseCompaction::garbage_collection() {
-    CloudCompactionMixin::garbage_collection();
+Status CloudBaseCompaction::garbage_collection() {
+    RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
     cloud::TabletJobInfoPB job;
     auto idx = job.mutable_idx();
     idx->set_tablet_id(_tablet->tablet_id());
@@ -418,9 +412,7 @@ void CloudBaseCompaction::garbage_collection() {
     compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
-                            std::numeric_limits<int64_t>::max();
-        compaction_job->set_delete_bitmap_lock_initiator(initiator);
+        compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
     }
     auto st = _engine.meta_mgr().abort_tablet_job(job);
     if (!st.ok()) {
@@ -429,6 +421,7 @@ void CloudBaseCompaction::garbage_collection() {
                 .tag("tablet_id", _tablet->tablet_id())
                 .error(st);
     }
+    return st;
 }
 
 void CloudBaseCompaction::do_lease() {
diff --git a/be/src/cloud/cloud_base_compaction.h 
b/be/src/cloud/cloud_base_compaction.h
index 4240458f21b..b9f52922b8e 100644
--- a/be/src/cloud/cloud_base_compaction.h
+++ b/be/src/cloud/cloud_base_compaction.h
@@ -42,7 +42,7 @@ private:
 
     Status modify_rowsets() override;
 
-    void garbage_collection() override;
+    Status garbage_collection() override;
 
     void _filter_input_rowset();
 
@@ -50,7 +50,6 @@ private:
 
     ReaderType compaction_type() const override { return 
ReaderType::READER_BASE_COMPACTION; }
 
-    std::string _uuid;
     int64_t _input_segments = 0;
     int64_t _base_compaction_cnt = 0;
     int64_t _cumulative_compaction_cnt = 0;
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 7594ac84684..8addf7586f7 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -41,12 +41,7 @@ bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", 
"output_size");
 CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& 
engine,
                                                      CloudTabletSPtr tablet)
         : CloudCompactionMixin(engine, tablet,
-                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
-    auto uuid = UUIDGenerator::instance()->next_uuid();
-    std::stringstream ss;
-    ss << uuid;
-    _uuid = ss.str();
-}
+                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
 CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
 
@@ -284,8 +279,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
     });
 
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
-    int64_t initiator =
-            HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & 
std::numeric_limits<int64_t>::max();
+    int64_t initiator = this->initiator();
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
         RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
@@ -305,6 +299,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {
         compaction_job->set_delete_bitmap_lock_initiator(initiator);
     }
 
+    
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", 
{
+        LOG(INFO) << 
"CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id"
+                  << cloud_tablet()->tablet_id();
+        return Status::InternalError(
+                "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed 
for tablet_id {}",
+                cloud_tablet()->tablet_id());
+    });
     cloud::FinishTabletJobResponse resp;
     auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
     if (resp.has_alter_version()) {
@@ -440,8 +441,8 @@ Status 
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
     return Status::OK();
 }
 
-void CloudCumulativeCompaction::garbage_collection() {
-    CloudCompactionMixin::garbage_collection();
+Status CloudCumulativeCompaction::garbage_collection() {
+    RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
     cloud::TabletJobInfoPB job;
     auto idx = job.mutable_idx();
     idx->set_tablet_id(_tablet->tablet_id());
@@ -455,10 +456,15 @@ void CloudCumulativeCompaction::garbage_collection() {
     compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
-                            std::numeric_limits<int64_t>::max();
-        compaction_job->set_delete_bitmap_lock_initiator(initiator);
+        compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
     }
+    
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", 
{
+        LOG(INFO) << "CumulativeCompaction.modify_rowsets.abort_job_failed for 
tablet_id"
+                  << cloud_tablet()->tablet_id();
+        return Status::InternalError(
+                "CumulativeCompaction.modify_rowsets.abort_job_failed for 
tablet_id {}",
+                cloud_tablet()->tablet_id());
+    });
     auto st = _engine.meta_mgr().abort_tablet_job(job);
     if (!st.ok()) {
         LOG_WARNING("failed to abort compaction job")
@@ -466,6 +472,7 @@ void CloudCumulativeCompaction::garbage_collection() {
                 .tag("tablet_id", _tablet->tablet_id())
                 .error(st);
     }
+    return st;
 }
 
 Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
diff --git a/be/src/cloud/cloud_cumulative_compaction.h 
b/be/src/cloud/cloud_cumulative_compaction.h
index 87fc0b62c9c..1e50cd93215 100644
--- a/be/src/cloud/cloud_cumulative_compaction.h
+++ b/be/src/cloud/cloud_cumulative_compaction.h
@@ -44,7 +44,7 @@ private:
 
     Status modify_rowsets() override;
 
-    void garbage_collection() override;
+    Status garbage_collection() override;
 
     void update_cumulative_point();
 
@@ -52,7 +52,6 @@ private:
 
     ReaderType compaction_type() const override { return 
ReaderType::READER_CUMULATIVE_COMPACTION; }
 
-    std::string _uuid;
     int64_t _input_segments = 0;
     int64_t _max_conflict_version = 0;
     // Snapshot values when pick input rowsets
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index f983e57ebe0..b7dde4266df 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -42,12 +42,7 @@ bvar::Adder<uint64_t> full_output_size("full_compaction", 
"output_size");
 
 CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, 
CloudTabletSPtr tablet)
         : CloudCompactionMixin(engine, tablet,
-                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
-    auto uuid = UUIDGenerator::instance()->next_uuid();
-    std::stringstream ss;
-    ss << uuid;
-    _uuid = ss.str();
-}
+                               "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
 CloudFullCompaction::~CloudFullCompaction() = default;
 
@@ -227,10 +222,8 @@ Status CloudFullCompaction::modify_rowsets() {
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        int64_t initiator =
-                boost::hash_range(_uuid.begin(), _uuid.end()) & 
std::numeric_limits<int64_t>::max();
-        
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(initiator));
-        compaction_job->set_delete_bitmap_lock_initiator(initiator);
+        
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator()));
+        compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
     }
 
     cloud::FinishTabletJobResponse resp;
@@ -271,7 +264,7 @@ Status CloudFullCompaction::modify_rowsets() {
     return Status::OK();
 }
 
-void CloudFullCompaction::garbage_collection() {
+Status CloudFullCompaction::garbage_collection() {
     //file_cache_garbage_collection();
     cloud::TabletJobInfoPB job;
     auto idx = job.mutable_idx();
@@ -286,9 +279,7 @@ void CloudFullCompaction::garbage_collection() {
     compaction_job->set_type(cloud::TabletCompactionJobPB::FULL);
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        int64_t initiator =
-                boost::hash_range(_uuid.begin(), _uuid.end()) & 
std::numeric_limits<int64_t>::max();
-        compaction_job->set_delete_bitmap_lock_initiator(initiator);
+        compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
     }
     auto st = _engine.meta_mgr().abort_tablet_job(job);
     if (!st.ok()) {
@@ -297,6 +288,7 @@ void CloudFullCompaction::garbage_collection() {
                 .tag("tablet_id", _tablet->tablet_id())
                 .error(st);
     }
+    return st;
 }
 
 void CloudFullCompaction::do_lease() {
diff --git a/be/src/cloud/cloud_full_compaction.h 
b/be/src/cloud/cloud_full_compaction.h
index 3cbc353f6dd..882fd752926 100644
--- a/be/src/cloud/cloud_full_compaction.h
+++ b/be/src/cloud/cloud_full_compaction.h
@@ -42,7 +42,7 @@ protected:
     std::string_view compaction_name() const override { return 
"CloudFullCompaction"; }
 
     Status modify_rowsets() override;
-    void garbage_collection() override;
+    Status garbage_collection() override;
 
 private:
     Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator);
@@ -52,7 +52,6 @@ private:
 
     ReaderType compaction_type() const override { return 
ReaderType::READER_FULL_COMPACTION; }
 
-    std::string _uuid;
     int64_t _input_segments = 0;
     // Snapshot values when pick input rowsets
     int64_t _base_compaction_cnt = 0;
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 7922ac633fd..1b8a91abe3e 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1219,23 +1219,24 @@ Status 
CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in
     return st;
 }
 
-Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& 
tablet, int64_t lock_id,
-                                                      int64_t initiator) {
-    VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << 
tablet.tablet_id()
-               << ",lock_id:" << lock_id;
+void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t 
lock_id,
+                                                    int64_t initiator, int64_t 
tablet_id) {
+    LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
+              << ",lock_id:" << lock_id << ",initiator:" << initiator << 
",tablet_id:" << tablet_id;
     RemoveDeleteBitmapUpdateLockRequest req;
     RemoveDeleteBitmapUpdateLockResponse res;
     req.set_cloud_unique_id(config::cloud_unique_id);
-    req.set_tablet_id(tablet.tablet_id());
+    req.set_table_id(table_id);
+    req.set_tablet_id(tablet_id);
     req.set_lock_id(lock_id);
     req.set_initiator(initiator);
     auto st = retry_rpc("remove delete bitmap update lock", req, &res,
                         &MetaService_Stub::remove_delete_bitmap_update_lock);
     if (!st.ok()) {
-        LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << 
tablet.tablet_id()
-                     << " lock_id=" << lock_id << " st=" << st.to_string();
+        LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << 
table_id
+                     << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
+                     << ",st=" << st.to_string();
     }
-    return st;
 }
 
 Status CloudMetaMgr::remove_old_version_delete_bitmap(
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 913ef59489a..f0a1b1a6648 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -103,8 +103,8 @@ public:
     Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t 
lock_id,
                                          int64_t initiator);
 
-    Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t 
lock_id,
-                                            int64_t initiator);
+    void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, 
int64_t initiator,
+                                          int64_t tablet_id);
 
     Status remove_old_version_delete_bitmap(
             int64_t tablet_id,
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 1642fd9efd5..333fd281f13 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -36,6 +36,7 @@
 #include "common/logging.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
+#include "olap/compaction.h"
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
@@ -54,7 +55,6 @@ namespace doris {
 #include "common/compile_check_begin.h"
 using namespace ErrorCode;
 
-static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
 static constexpr int LOAD_INITIATOR_ID = -1;
 
 CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 59a3c59b5d3..4dee3002e3a 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -66,7 +66,7 @@ DEFINE_mInt32(sync_load_for_tablets_thread, "32");
 
 DEFINE_mBool(enable_new_tablet_do_compaction, "false");
 
-DEFINE_Int32(delete_bitmap_lock_expiration_seconds, "10");
+DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10");
 
 DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");
 
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 50f058bf8b0..f79038662ef 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -98,7 +98,7 @@ DECLARE_mBool(save_load_error_log_to_s3);
 // the theads which sync the datas which loaded in other clusters
 DECLARE_mInt32(sync_load_for_tablets_thread);
 
-DECLARE_Int32(delete_bitmap_lock_expiration_seconds);
+DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
 
 // enable large txn lazy commit in meta-service `commit_txn`
 DECLARE_mBool(enable_cloud_txn_lazy_commit);
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 1bc7fbfa2a3..ff02cdf174c 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1266,7 +1266,12 @@ int64_t CloudCompactionMixin::get_compaction_permits() {
 
 CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, 
CloudTabletSPtr tablet,
                                            const std::string& label)
-        : Compaction(tablet, label), _engine(engine) {}
+        : Compaction(tablet, label), _engine(engine) {
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
 
 Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
     OlapStopWatch watch;
@@ -1298,11 +1303,26 @@ Status 
CloudCompactionMixin::execute_compact_impl(int64_t permits) {
     return Status::OK();
 }
 
+int64_t CloudCompactionMixin::initiator() const {
+    return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & 
std::numeric_limits<int64_t>::max();
+}
+
 Status CloudCompactionMixin::execute_compact() {
     TEST_INJECTION_POINT("Compaction::do_compaction");
     int64_t permits = get_compaction_permits();
-    HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits),
-                                        [&](const doris::Exception& ex) { 
garbage_collection(); });
+    HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
+            execute_compact_impl(permits), [&](const doris::Exception& ex) {
+                auto st = garbage_collection();
+                if (!st.ok() && initiator() != 
INVALID_COMPACTION_INITIATOR_ID) {
+                    // if compaction fail, be will try to abort compaction, 
and delete bitmap lock
+                    // will release if abort job successfully, but if abort 
failed, delete bitmap
+                    // lock will not release, in this situation, be need to 
send this rpc to ms
+                    // to try to release delete bitmap lock.
+                    _engine.meta_mgr().remove_delete_bitmap_update_lock(
+                            _tablet->table_id(), 
COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(),
+                            _tablet->tablet_id());
+                }
+            });
     _load_segment_to_cache();
     return Status::OK();
 }
@@ -1352,9 +1372,9 @@ Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
     return Status::OK();
 }
 
-void CloudCompactionMixin::garbage_collection() {
+Status CloudCompactionMixin::garbage_collection() {
     if (!config::enable_file_cache) {
-        return;
+        return Status::OK();
     }
     if (_output_rs_writer) {
         auto* beta_rowset_writer = 
dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get());
@@ -1365,6 +1385,7 @@ void CloudCompactionMixin::garbage_collection() {
             file_cache->remove_if_cached_async(file_key);
         }
     }
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index b69e5ad8f45..47aa49aa57e 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -41,6 +41,8 @@ struct RowsetWriterContext;
 class StorageEngine;
 class CloudStorageEngine;
 
+static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
+static constexpr int64_t INVALID_COMPACTION_INITIATOR_ID = -100;
 // This class is a base class for compaction.
 // The entrance of this class is compact()
 // Any compaction should go through four procedures.
@@ -142,6 +144,8 @@ public:
 
     int64_t get_compaction_permits();
 
+    int64_t initiator() const { return INVALID_COMPACTION_INITIATOR_ID; }
+
 protected:
     // Convert `_tablet` from `BaseTablet` to `Tablet`
     Tablet* tablet();
@@ -176,13 +180,17 @@ public:
 
     Status execute_compact() override;
 
+    int64_t initiator() const;
+
 protected:
     CloudTablet* cloud_tablet() { return 
static_cast<CloudTablet*>(_tablet.get()); }
 
-    virtual void garbage_collection();
+    virtual Status garbage_collection();
 
     CloudStorageEngine& _engine;
 
+    std::string _uuid;
+
     int64_t _expiration = 0;
 
 private:
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index f0d4b9f4861..bc8af94496a 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1747,11 +1747,11 @@ void 
MetaServiceImpl::get_tablet_stats(::google::protobuf::RpcController* contro
 }
 
 static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
-                                     std::unique_ptr<Transaction>& txn, 
std::string& instance_id,
-                                     int64_t table_id, int64_t lock_id, 
int64_t lock_initiator) {
-    std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
+                                     std::unique_ptr<Transaction>& txn, 
int64_t table_id,
+                                     int64_t lock_id, int64_t lock_initiator, 
std::string& lock_key,
+                                     DeleteBitmapUpdateLockPB& lock_info) {
     std::string lock_val;
-    DeleteBitmapUpdateLockPB lock_info;
+    LOG(INFO) << "check_delete_bitmap_lock, table_id=" << table_id << " key=" 
<< hex(lock_key);
     auto err = txn->get(lock_key, &lock_val);
     
TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", 
&err);
     if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
@@ -1865,8 +1865,10 @@ void 
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
     bool unlock = request->has_unlock() ? request->unlock() : false;
     if (!unlock) {
         // 1. Check whether the lock expires
-        if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, 
table_id, request->lock_id(),
-                                      request->initiator())) {
+        std::string lock_key = 
meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
+        DeleteBitmapUpdateLockPB lock_info;
+        if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, 
request->lock_id(),
+                                      request->initiator(), lock_key, 
lock_info)) {
             LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << 
table_id
                          << " request lock_id=" << request->lock_id()
                          << " request initiator=" << request->initiator() << " 
msg " << msg;
@@ -1961,8 +1963,11 @@ void 
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
                 return;
             }
             if (!unlock) {
-                if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, 
table_id,
-                                              request->lock_id(), 
request->initiator())) {
+                std::string lock_key =
+                        meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
+                DeleteBitmapUpdateLockPB lock_info;
+                if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, 
request->lock_id(),
+                                              request->initiator(), lock_key, 
lock_info)) {
                     LOG(WARNING) << "failed to check delete bitmap lock, 
table_id=" << table_id
                                  << " request lock_id=" << request->lock_id()
                                  << " request initiator=" << 
request->initiator() << " msg " << msg;
@@ -2344,8 +2349,9 @@ void 
MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
     LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction 
cnts cost={} ms",
                              request->tablet_indexes().size(), 
read_stats_sw.elapsed_us() / 1000);
 
-    if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, 
request->lock_id(),
-                                  request->initiator())) {
+    DeleteBitmapUpdateLockPB lock_info_tmp;
+    if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, 
request->lock_id(),
+                                  request->initiator(), lock_key, 
lock_info_tmp)) {
         LOG(WARNING) << "failed to check delete bitmap lock after get tablet 
stats, table_id="
                      << table_id << " request lock_id=" << request->lock_id()
                      << " request initiator=" << request->initiator() << " 
code=" << code
@@ -2381,17 +2387,48 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock(
         msg = "failed to init txn";
         return;
     }
-    if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, 
request->table_id(),
-                                  request->lock_id(), request->initiator())) {
+    std::string lock_key =
+            meta_delete_bitmap_update_lock_key({instance_id, 
request->table_id(), -1});
+    std::string lock_val;
+    DeleteBitmapUpdateLockPB lock_info;
+    if (!check_delete_bitmap_lock(code, msg, ss, txn, request->table_id(), 
request->lock_id(),
+                                  request->initiator(), lock_key, lock_info)) {
         LOG(WARNING) << "failed to check delete bitmap tablet lock"
                      << " table_id=" << request->table_id() << " tablet_id=" 
<< request->tablet_id()
                      << " request lock_id=" << request->lock_id()
                      << " request initiator=" << request->initiator() << " msg 
" << msg;
         return;
     }
-    std::string lock_key =
-            meta_delete_bitmap_update_lock_key({instance_id, 
request->table_id(), -1});
-    txn->remove(lock_key);
+    bool modify_initiators = false;
+    auto initiators = lock_info.mutable_initiators();
+    for (auto iter = initiators->begin(); iter != initiators->end(); iter++) {
+        if (*iter == request->initiator()) {
+            initiators->erase(iter);
+            modify_initiators = true;
+            break;
+        }
+    }
+    if (!modify_initiators) {
+        LOG(INFO) << "initiators don't have initiator=" << request->initiator()
+                  << ",initiators_size=" << lock_info.initiators_size() << 
",just return";
+        return;
+    } else if (initiators->empty()) {
+        LOG(INFO) << "remove delete bitmap lock, table_id=" << 
request->table_id()
+                  << " lock_id=" << request->lock_id() << " key=" << 
hex(lock_key);
+        txn->remove(lock_key);
+    } else {
+        lock_info.SerializeToString(&lock_val);
+        if (lock_val.empty()) {
+            LOG(WARNING) << "failed to seiralize lock_info, table_id=" << 
request->table_id()
+                         << " key=" << hex(lock_key);
+            return;
+        }
+        LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << 
request->table_id()
+                  << ", key=" << hex(lock_key) << " lock_id=" << 
request->lock_id()
+                  << " initiator=" << request->initiator()
+                  << " initiators_size=" << lock_info.initiators_size();
+        txn->put(lock_key, lock_val);
+    }
     err = txn->commit();
     if (err != TxnErrorCode::TXN_OK) {
         code = cast_as<ErrCategory::COMMIT>(err);
@@ -2399,10 +2436,6 @@ void MetaServiceImpl::remove_delete_bitmap_update_lock(
         msg = ss.str();
         return;
     }
-
-    LOG(INFO) << "remove delete bitmap table lock table_id=" << 
request->table_id()
-              << " tablet_id=" << request->tablet_id() << " lock_id=" << 
request->lock_id()
-              << ", key=" << hex(lock_key) << ", initiator=" << 
request->initiator();
 }
 
 void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* 
controller,
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 173c6834b5f..5299b85f41d 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -458,7 +458,7 @@ static bool 
check_and_remove_delete_bitmap_update_lock(MetaServiceCode& code, st
     std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
     std::string lock_val;
     TxnErrorCode err = txn->get(lock_key, &lock_val);
-    LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id
+    LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << 
table_id
               << " key=" << hex(lock_key) << " err=" << err;
     if (err != TxnErrorCode::TXN_OK) {
         ss << "failed to get delete bitmap update lock key, instance_id=" << 
instance_id
@@ -520,7 +520,7 @@ static void 
remove_delete_bitmap_update_lock(std::unique_ptr<Transaction>& txn,
     std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
     std::string lock_val;
     TxnErrorCode err = txn->get(lock_key, &lock_val);
-    LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id
+    LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << 
table_id
               << " key=" << hex(lock_key) << " err=" << err;
     if (err != TxnErrorCode::TXN_OK) {
         LOG(WARNING) << "failed to get delete bitmap update lock key, 
instance_id=" << instance_id
diff --git 
a/regression-test/data/compaction/test_compaction_fail_release_lock.out 
b/regression-test/data/compaction/test_compaction_fail_release_lock.out
new file mode 100644
index 00000000000..dcbfab2d353
Binary files /dev/null and 
b/regression-test/data/compaction/test_compaction_fail_release_lock.out differ
diff --git 
a/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy 
b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy
new file mode 100644
index 00000000000..e8763446b9d
--- /dev/null
+++ b/regression-test/suites/compaction/test_compaction_fail_release_lock.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_compaction_fail_release_lock", "nonConcurrent") {
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string: [:]]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_param = { paramName, paramValue ->
+        // for eache be node, set paramName=paramValue
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def reset_be_param = { paramName ->
+        // for eache be node, reset paramName to default
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def original_value = backendId_to_params.get(id).get(paramName)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
original_value))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def get_be_param = { paramName ->
+        // for eache be node, get param value by default
+        def paramValue = ""
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assertTrue(code == 0)
+            assertTrue(out.contains(paramName))
+            // parsing
+            def resultList = parseJson(out)[0]
+            assertTrue(resultList.size() == 4)
+            // get original value
+            paramValue = resultList[2]
+            backendId_to_params.get(id, [:]).put(paramName, paramValue)
+        }
+    }
+
+    def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id ->
+        if (compact_type == "cumulative") {
+            def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + 
", err=" + err_1)
+            assertEquals(code_1, 0)
+            return out_1
+        } else if (compact_type == "full") {
+            def (code_2, out_2, err_2) = be_run_full_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + 
", err=" + err_2)
+            assertEquals(code_2, 0)
+            return out_2
+        } else {
+            assertFalse(True)
+        }
+    }
+
+    def getTabletStatus = { be_host, be_http_port, tablet_id ->
+        boolean running = true
+        Thread.sleep(1000)
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        process = command.execute()
+        code = process.waitFor()
+        out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+        def tabletStatus = parseJson(out.trim())
+        return tabletStatus
+    }
+
+    def waitForCompaction = { be_host, be_http_port, tablet_id ->
+        boolean running = true
+        do {
+            Thread.sleep(1000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run_status?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            out = process.getText()
+            logger.info("Get compaction status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+    }
+
+    // store the original value
+    get_be_param("delete_bitmap_lock_expiration_seconds")
+    set_be_param("delete_bitmap_lock_expiration_seconds", "60")
+
+    try {
+        def testTable = "test_compaction_fail_release_lock"
+        def timeout = 10000
+        sql """ DROP TABLE IF EXISTS ${testTable}"""
+        def testTableDDL = """
+        create table ${testTable}
+            (
+            `plan_id` bigint(20) NOT NULL,
+            `target_id` int(20) NOT NULL,
+            `target_name` varchar(255) NOT NULL
+            )
+            ENGINE=OLAP
+            UNIQUE KEY(`plan_id`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`plan_id`) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "replication_allocation" = "tag.location.default: 1",
+                "disable_auto_compaction" = "true"
+            );
+    """
+        sql testTableDDL
+
+        sql "sync"
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """
+
+        qt_sql "select * from ${testTable} order by plan_id"
+
+        
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed")
+
+        // trigger cu compaction, compaction will commit fail
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        logger.info("tablets: " + tablets)
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            def tablet_info = sql_return_maparray """ show tablet 
${tablet_id}; """
+            logger.info("tablet: " + tablet_info)
+            String trigger_backend_id = tablet.BackendId
+            getTabletStatus(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+
+            
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "cumulative", tablet_id).contains("Success"));
+            waitForCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+            getTabletStatus(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+        }
+
+        def now = System.currentTimeMillis()
+
+        // insert will done before timeout
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """
+        sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """
+
+        def time_diff = System.currentTimeMillis() - now
+        logger.info("time_diff:" + time_diff)
+        assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout")
+
+        qt_sql "select * from ${testTable} order by plan_id"
+
+    } finally {
+        reset_be_param("delete_bitmap_lock_expiration_seconds")
+        
GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed")
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to