This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0983526722a branch-3.0-pick: [Fix](cloud-mow) Check partition's 
version to avoid wrongly update visible versions' delete bitmaps (#49710) 
(#49796)
0983526722a is described below

commit 0983526722aaca0817a765433e2440d0fc39230a
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Mon Apr 7 12:04:28 2025 +0800

    branch-3.0-pick: [Fix](cloud-mow) Check partition's version to avoid 
wrongly update visible versions' delete bitmaps (#49710) (#49796)
    
    pick https://github.com/apache/doris/pull/49710
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |   3 +-
 be/src/cloud/cloud_meta_mgr.cpp                    |  11 +-
 be/src/cloud/cloud_meta_mgr.h                      |   3 +-
 be/src/cloud/cloud_tablet.cpp                      |  12 +-
 be/src/cloud/cloud_tablet.h                        |   5 +-
 be/src/olap/base_tablet.cpp                        |   3 +-
 be/src/olap/base_tablet.h                          |   4 +-
 be/src/olap/tablet.cpp                             |   3 +-
 be/src/olap/tablet.h                               |   3 +-
 cloud/src/meta-service/meta_service.cpp            | 114 +++++-
 cloud/src/meta-service/meta_service_txn.cpp        |   1 +
 cloud/test/meta_service_test.cpp                   | 384 ++++++++++++++++++++-
 gensrc/proto/cloud.proto                           |   7 +
 13 files changed, 536 insertions(+), 17 deletions(-)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 86b369b3db1..39c0575c8b1 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -259,7 +259,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
 
         // we still need to update delete bitmap KVs to MS when we skip to 
calcalate delete bitmaps,
         // because the pending delete bitmap KVs in MS we wrote before may 
have been removed and replaced by other txns
-        RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, 
_transaction_id, delete_bitmap));
+        RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, 
_transaction_id, delete_bitmap,
+                                                         _version));
 
         LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id
                   << ", publish_status=SUCCEED, not need to re-calculate 
delete_bitmaps.";
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c92d5e9404e..6f6024f1912 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1141,7 +1141,9 @@ Status CloudMetaMgr::update_tablet_schema(int64_t 
tablet_id, const TabletSchema&
 }
 
 Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t 
lock_id,
-                                          int64_t initiator, DeleteBitmap* 
delete_bitmap) {
+                                          int64_t initiator, DeleteBitmap* 
delete_bitmap,
+                                          int64_t txn_id, bool is_explicit_txn,
+                                          int64_t next_visible_version) {
     VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
     UpdateDeleteBitmapRequest req;
     UpdateDeleteBitmapResponse res;
@@ -1151,6 +1153,13 @@ Status CloudMetaMgr::update_delete_bitmap(const 
CloudTablet& tablet, int64_t loc
     req.set_tablet_id(tablet.tablet_id());
     req.set_lock_id(lock_id);
     req.set_initiator(initiator);
+    req.set_is_explicit_txn(is_explicit_txn);
+    if (txn_id > 0) {
+        req.set_txn_id(txn_id);
+    }
+    if (next_visible_version > 0) {
+        req.set_next_visible_version(next_visible_version);
+    }
     for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
         req.add_rowset_ids(std::get<0>(key).to_string());
         req.add_segment_ids(std::get<1>(key));
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index d06e55e69ad..a666a5e4d16 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -99,7 +99,8 @@ public:
     Status update_tablet_schema(int64_t tablet_id, const TabletSchema& 
tablet_schema);
 
     Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, 
int64_t initiator,
-                                DeleteBitmap* delete_bitmap);
+                                DeleteBitmap* delete_bitmap, int64_t txn_id = 
-1,
+                                bool is_explicit_txn = false, int64_t 
next_visible_version = -1);
 
     Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
                                                    DeleteBitmap* 
delete_bitmap);
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index a1026c9518d..cf3fe051bec 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -699,7 +699,8 @@ CalcDeleteBitmapExecutor* 
CloudTablet::calc_delete_bitmap_executor() {
 
 Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                        DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
-                                       const RowsetIdUnorderedSet& 
cur_rowset_ids) {
+                                       const RowsetIdUnorderedSet& 
cur_rowset_ids,
+                                       int64_t next_visible_version) {
     RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
     // update delete bitmap info, in order to avoid recalculation when trying 
again
@@ -715,7 +716,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* 
txn_info, int64_t tx
         RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
     }
 
-    RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, 
delete_bitmap));
+    RETURN_IF_ERROR(
+            save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, 
next_visible_version));
 
     // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache 
because if the txn is retried for some reason,
     // it will use the delete bitmap from txn_delete_bitmap_cache when 
re-calculating the delete bitmap, during which it will do
@@ -745,7 +747,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* 
txn_info, int64_t tx
 }
 
 Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t 
txn_id,
-                                             DeleteBitmapPtr delete_bitmap) {
+                                             DeleteBitmapPtr delete_bitmap,
+                                             int64_t next_visible_version) {
     DeleteBitmapPtr new_delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
     for (auto iter = delete_bitmap->delete_bitmap.begin();
          iter != delete_bitmap->delete_bitmap.end(); ++iter) {
@@ -758,7 +761,8 @@ Status CloudTablet::save_delete_bitmap_to_ms(int64_t 
cur_version, int64_t txn_id
     }
 
     RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, 
LOAD_INITIATOR_ID,
-                                                            
new_delete_bitmap.get()));
+                                                            
new_delete_bitmap.get(), txn_id, false,
+                                                            
next_visible_version));
     return Status::OK();
 }
 
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index b2d6e8921b0..4226e26a0f5 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -170,10 +170,11 @@ public:
 
     Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
                               DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+                              const RowsetIdUnorderedSet& cur_rowset_ids,
+                              int64_t next_visible_version = -1) override;
 
     Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
-                                    DeleteBitmapPtr delete_bitmap);
+                                    DeleteBitmapPtr delete_bitmap, int64_t 
next_visible_version);
 
     Status calc_delete_bitmap_for_compaction(const 
std::vector<RowsetSharedPtr>& input_rowsets,
                                              const RowsetSharedPtr& 
output_rowset,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 4250a9f09b8..c33043b3b64 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1352,7 +1352,8 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
     auto t5 = watch.get_elapse_time_us();
     RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
-                                             transient_rs_writer.get(), 
cur_rowset_ids));
+                                             transient_rs_writer.get(), 
cur_rowset_ids,
+                                             cur_version));
 
     // defensive check, check that the delete bitmap cache we wrote is correct
     RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id, 
delete_bitmap.get()));
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 693f08dcac7..40928e63729 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -228,10 +228,10 @@ public:
 
     static Status update_delete_bitmap(const BaseTabletSPtr& self, 
TabletTxnInfo* txn_info,
                                        int64_t txn_id, int64_t txn_expiration 
= 0);
-
     virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                       DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
-                                      const RowsetIdUnorderedSet& 
cur_rowset_ids) = 0;
+                                      const RowsetIdUnorderedSet& 
cur_rowset_ids,
+                                      int64_t next_visible_version = -1) = 0;
     virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
 
     void calc_compaction_output_rowset_delete_bitmap(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 78cb6f23844..a1011661fc0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2483,7 +2483,8 @@ CalcDeleteBitmapExecutor* 
Tablet::calc_delete_bitmap_executor() {
 
 Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                   DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                                  const RowsetIdUnorderedSet& cur_rowset_ids) {
+                                  const RowsetIdUnorderedSet& cur_rowset_ids,
+                                  int64_t next_visible_version) {
     RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 96bc5d87e3c..ff18ce50657 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -417,7 +417,8 @@ public:
     CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
     Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
                               DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+                              const RowsetIdUnorderedSet& cur_rowset_ids,
+                              int64_t next_visible_version = -1) override;
 
     void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
     bool check_all_rowset_segment();
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 6ceee180939..78daf880c54 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1829,6 +1829,106 @@ static bool 
process_pending_delete_bitmap(MetaServiceCode& code, std::string& ms
     return true;
 }
 
+// When a load txn retries in publish phase with different version to publish, 
it will gain delete bitmap lock
+// many times. these locks are *different*, but they are the same in the 
current implementation because they have
+// the same lock_id and initiator and don't have version info. If some delete 
bitmap calculation task with version X
+// on BE lasts long and try to update delete bitmaps on MS when the txn gains 
the lock in later retries
+// with version Y(Y > X) to publish. It may wrongly update version X's delete 
bitmaps because the lock don't have version info.
+//
+// This function checks whether the partition version is correct when updating 
the delete bitmap
+// to avoid wrongly update an visible version's delete bitmaps.
+// 1. get the db id with txn id
+// 2. get the partition version with db id, table id and partition id
+// 3. check if the partition version matches the updating version
+static bool check_partition_version_when_update_delete_bitmap(
+        MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>& 
txn,
+        std::string& instance_id, int64_t table_id, int64_t partition_id, 
int64_t tablet_id,
+        int64_t txn_id, int64_t next_visible_version) {
+    if (partition_id <= 0) {
+        LOG(WARNING) << fmt::format(
+                "invalid partition_id, skip to check partition version. 
txn={}, "
+                "table_id={}, partition_id={}, tablet_id={}",
+                txn_id, table_id, partition_id, tablet_id);
+        return true;
+    }
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    auto err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, 
err);
+        LOG(WARNING) << msg;
+        return false;
+    }
+
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+        LOG(WARNING) << msg;
+        return false;
+    }
+
+    DCHECK(index_pb.has_tablet_index())
+            << fmt::format("txn={}, table_id={}, partition_id={}, 
tablet_id={}, index_pb={}",
+                           txn_id, table_id, partition_id, tablet_id, 
proto_to_json(index_pb));
+    DCHECK(index_pb.tablet_index().has_db_id())
+            << fmt::format("txn={}, table_id={}, partition_id={}, 
tablet_id={}, index_pb={}",
+                           txn_id, table_id, partition_id, tablet_id, 
proto_to_json(index_pb));
+    if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+        LOG(WARNING) << fmt::format(
+                "has no db_id in TxnIndexPB, skip to check partition version. 
txn={}, "
+                "table_id={}, partition_id={}, tablet_id={}, index_pb={}",
+                txn_id, table_id, partition_id, tablet_id, 
proto_to_json(index_pb));
+        return true;
+    }
+    int64_t db_id = index_pb.tablet_index().db_id();
+
+    std::string ver_key = partition_version_key({instance_id, db_id, table_id, 
partition_id});
+    std::string ver_val;
+    err = txn->get(ver_key, &ver_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        msg = fmt::format("failed to get partition version, txn_id={}, 
tablet={}, err={}", txn_id,
+                          tablet_id, err);
+        LOG(WARNING) << msg;
+        return false;
+    }
+
+    int64_t cur_max_version {-1};
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        cur_max_version = 1;
+    } else {
+        VersionPB version_pb;
+        if (!version_pb.ParseFromString(ver_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = fmt::format("failed to parse version_pb, txn_id={}, 
tablet={}, key={}", txn_id,
+                              tablet_id, hex(ver_key));
+            LOG(WARNING) << msg;
+            return false;
+        }
+        DCHECK(version_pb.has_version());
+        cur_max_version = version_pb.version();
+
+        if (version_pb.pending_txn_ids_size() > 0) {
+            DCHECK(version_pb.pending_txn_ids_size() == 1);
+            cur_max_version += version_pb.pending_txn_ids_size();
+        }
+    }
+
+    if (cur_max_version + 1 != next_visible_version) {
+        code = MetaServiceCode::VERSION_NOT_MATCH;
+        msg = fmt::format(
+                "check version failed when update_delete_bitmap, txn={}, 
table_id={}, "
+                "partition_id={}, tablet_id={}, found partition's max version 
is {}, but "
+                "request next_visible_version is {}",
+                txn_id, table_id, partition_id, tablet_id, cur_max_version, 
next_visible_version);
+        return false;
+    }
+    return true;
+}
+
 void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* 
controller,
                                            const UpdateDeleteBitmapRequest* 
request,
                                            UpdateDeleteBitmapResponse* 
response,
@@ -1880,7 +1980,17 @@ void 
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
         }
     }
 
-    // 3. store all pending delete bitmap for this txn
+    // 3. check if partition's version matches
+    if (request->lock_id() > 0 && request->has_txn_id() && 
request->has_partition_id() &&
+        request->has_next_visible_version()) {
+        if (!check_partition_version_when_update_delete_bitmap(
+                    code, msg, txn, instance_id, table_id, 
request->partition_id(), tablet_id,
+                    request->txn_id(), request->next_visible_version())) {
+            return;
+        }
+    }
+
+    // 4. store all pending delete bitmap for this txn
     PendingDeleteBitmapPB delete_bitmap_keys;
     for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
         MetaDeleteBitmapInfo key_info {instance_id, tablet_id, 
request->rowset_ids(i),
@@ -1919,7 +2029,7 @@ void 
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
         }
     }
 
-    // 4. Update delete bitmap for curent txn
+    // 5. Update delete bitmap for curent txn
     size_t current_key_count = 0;
     size_t current_value_count = 0;
     size_t total_key_count = 0;
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 8bec142e13c..b251ec64ed0 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3163,6 +3163,7 @@ void 
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
     const std::string index_key = txn_index_key({instance_id, sub_txn_id});
     std::string index_val;
     TxnIndexPB index_pb;
+    index_pb.mutable_tablet_index()->set_db_id(db_id);
     if (!index_pb.SerializeToString(&index_val)) {
         code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
         ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 10a5b3c6f18..0a1be69e1eb 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -154,6 +154,18 @@ static void create_tablet(MetaServiceProxy* meta_service, 
int64_t table_id, int6
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
 }
 
+static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t 
db_id,
+                                     int64_t table_id, int64_t index_id, 
int64_t partition_id,
+                                     int64_t tablet_id) {
+    brpc::Controller cntl;
+    CreateTabletsRequest req;
+    CreateTabletsResponse res;
+    req.set_db_id(db_id);
+    add_tablet(req, table_id, index_id, partition_id, tablet_id);
+    meta_service->create_tablets(&cntl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
+}
+
 static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const 
std::string& label,
                       int64_t table_id, int64_t& txn_id) {
     brpc::Controller cntl;
@@ -2050,7 +2062,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
         index_key = txn_index_key({mock_instance, sub_txn_id3});
         ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK);
         txn_index.ParseFromString(index_val);
-        ASSERT_FALSE(txn_index.has_tablet_index());
+        ASSERT_TRUE(txn_index.has_tablet_index());
 
         // txn_label
         std::string label_key = txn_label_key({mock_instance, db_id, label});
@@ -4923,6 +4935,376 @@ TEST(MetaServiceTest, UpdateDeleteBitmapWithBigKeys) {
     ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
 }
 
+static void set_partition_version(MetaServiceProxy* meta_service, 
std::string_view instance_id,
+                                  int64_t db_id, int64_t table_id, int64_t 
partition_id,
+                                  int64_t version, std::vector<int64_t> 
pending_txn_ids = {}) {
+    std::string ver_key = partition_version_key({instance_id, db_id, table_id, 
partition_id});
+    std::string ver_val;
+    VersionPB version_pb;
+    version_pb.set_version(version);
+    if (!pending_txn_ids.empty()) {
+        for (auto txn_id : pending_txn_ids) {
+            version_pb.add_pending_txn_ids(txn_id);
+        }
+    }
+    ASSERT_TRUE(version_pb.SerializeToString(&ver_val));
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    txn->put(ver_key, ver_val);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+static void begin_txn_and_commit_rowset(MetaServiceProxy* meta_service, const 
std::string& label,
+                                        int64_t db_id, int64_t table_id, 
int64_t partition_id,
+                                        int64_t tablet_id, int64_t* txn_id) {
+    begin_txn(meta_service, db_id, label, table_id, *txn_id);
+    CreateRowsetResponse res;
+    auto rowset = create_rowset(*txn_id, tablet_id, partition_id);
+    prepare_rowset(meta_service, rowset, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    res.Clear();
+    commit_rowset(meta_service, rowset, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+}
+
+static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service, 
int64_t table_id,
+                                          int64_t partition_id, int64_t 
lock_id,
+                                          int64_t initiator) {
+    brpc::Controller cntl;
+    GetDeleteBitmapUpdateLockRequest get_lock_req;
+    GetDeleteBitmapUpdateLockResponse get_lock_res;
+    get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
+    get_lock_req.set_table_id(table_id);
+    get_lock_req.add_partition_ids(partition_id);
+    get_lock_req.set_expiration(5);
+    get_lock_req.set_lock_id(lock_id);
+    get_lock_req.set_initiator(initiator);
+    meta_service->get_delete_bitmap_update_lock(
+            reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&get_lock_req,
+            &get_lock_res, nullptr);
+    ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);
+}
+
+static void update_delete_bitmap(MetaServiceProxy* meta_service,
+                                 UpdateDeleteBitmapRequest& 
update_delete_bitmap_req,
+                                 UpdateDeleteBitmapResponse& 
update_delete_bitmap_res,
+                                 int64_t table_id, int64_t partition_id, 
int64_t lock_id,
+                                 int64_t initiator, int64_t tablet_id, int64_t 
txn_id,
+                                 int64_t next_visible_version, std::string 
data = "1111") {
+    brpc::Controller cntl;
+    update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+    update_delete_bitmap_req.set_table_id(table_id);
+    update_delete_bitmap_req.set_partition_id(partition_id);
+    update_delete_bitmap_req.set_lock_id(lock_id);
+    update_delete_bitmap_req.set_initiator(initiator);
+    update_delete_bitmap_req.set_tablet_id(tablet_id);
+    update_delete_bitmap_req.set_txn_id(txn_id);
+    update_delete_bitmap_req.set_next_visible_version(next_visible_version);
+    update_delete_bitmap_req.add_rowset_ids("123");
+    update_delete_bitmap_req.add_segment_ids(0);
+    update_delete_bitmap_req.add_versions(next_visible_version);
+    update_delete_bitmap_req.add_segment_delete_bitmaps(data);
+    
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+                                       &update_delete_bitmap_req, 
&update_delete_bitmap_res,
+                                       nullptr);
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersion) {
+    auto meta_service = get_meta_service();
+    brpc::Controller cntl;
+
+    extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& 
rc_mgr,
+                                       const std::string& cloud_unique_id);
+    auto instance_id = get_instance_id(meta_service->resource_mgr(), 
"test_cloud_unique_id");
+
+    {
+        // 1. normal path
+        // 1.1 has partition version and request version matches
+        int64_t db_id = 999;
+        int64_t table_id = 1001;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t cur_max_version = 100;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+        set_partition_version(meta_service.get(), instance_id, db_id, 
table_id, t1p1,
+                              cur_max_version);
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id,
+                             cur_max_version + 1);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::OK);
+    }
+
+    {
+        // 1. normal path
+        // 1.2 does not have partition version KV and request version matches
+        int64_t db_id = 999;
+        int64_t table_id = 1002;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label12", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id, 2);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::OK);
+    }
+
+    {
+        // 1. normal path
+        // 1.3 has partition version and pending txn, and request version 
matches
+        int64_t db_id = 999;
+        int64_t table_id = 1003;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t cur_max_version = 120;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label13", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+        set_partition_version(meta_service.get(), instance_id, db_id, 
table_id, t1p1,
+                              cur_max_version, {12345});
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id,
+                             cur_max_version + 2);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::OK);
+    }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersionFail) {
+    auto meta_service = get_meta_service();
+    brpc::Controller cntl;
+
+    extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& 
rc_mgr,
+                                       const std::string& cloud_unique_id);
+    auto instance_id = get_instance_id(meta_service->resource_mgr(), 
"test_cloud_unique_id");
+
+    {
+        // 2. abnormal path
+        // 2.1 has partition version but request version does not match
+        int64_t db_id = 999;
+        int64_t table_id = 2001;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t cur_max_version = 100;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label21", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+        set_partition_version(meta_service.get(), instance_id, db_id, 
table_id, t1p1,
+                              cur_max_version);
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        // wrong version
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id,
+                             cur_max_version + 2);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::VERSION_NOT_MATCH);
+    }
+
+    {
+        // 2. abnormal path
+        // 2.2 does not have partition version KV and request version does not 
match
+        int64_t db_id = 999;
+        int64_t table_id = 2002;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label22", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        // first load, wrong version
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id, 10);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::VERSION_NOT_MATCH);
+    }
+
+    {
+        // 2. abnormal path
+        // 2.3 has partition version and pending txn, and request version 
matches
+        int64_t db_id = 999;
+        int64_t table_id = 2003;
+        int64_t index_id = 4001;
+        int64_t t1p1 = 2001;
+        int64_t tablet_id = 3001;
+        int64_t initiator = -1;
+        int64_t cur_max_version = 120;
+        int64_t txn_id;
+        ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id,
+                                                         index_id, t1p1, 
tablet_id));
+        begin_txn_and_commit_rowset(meta_service.get(), "label23", db_id, 
table_id, t1p1, tablet_id,
+                                    &txn_id);
+        int64_t lock_id = txn_id;
+
+        get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, 
lock_id, initiator);
+        set_partition_version(meta_service.get(), instance_id, db_id, 
table_id, t1p1,
+                              cur_max_version, {12345});
+
+        UpdateDeleteBitmapRequest update_delete_bitmap_req;
+        UpdateDeleteBitmapResponse update_delete_bitmap_res;
+        // wrong version
+        update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                             table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id,
+                             cur_max_version + 1);
+        ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::VERSION_NOT_MATCH);
+    }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapFailCase) {
+    // simulate the situation described in 
https://github.com/apache/doris/pull/49710
+    auto meta_service = get_meta_service();
+    brpc::Controller cntl;
+    extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& 
rc_mgr,
+                                       const std::string& cloud_unique_id);
+    auto instance_id = get_instance_id(meta_service->resource_mgr(), 
"test_cloud_unique_id");
+
+    int64_t db_id = 1999;
+    int64_t table_id = 1001;
+    int64_t index_id = 4001;
+    int64_t t1p1 = 2001;
+    int64_t tablet_id = 3001;
+    int64_t initiator = -1;
+    int64_t cur_max_version = 100;
+    set_partition_version(meta_service.get(), instance_id, db_id, table_id, 
t1p1, cur_max_version);
+    ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), 
db_id, table_id, index_id,
+                                                     t1p1, tablet_id));
+
+    // txn1 begins
+    int64_t txn_id1;
+    begin_txn_and_commit_rowset(meta_service.get(), "label31", db_id, 
table_id, t1p1, tablet_id,
+                                &txn_id1);
+    int64_t txn1_version_to_publish = cur_max_version + 1;
+    // txn1 gains the lock and try to publish with version 101
+    int64_t lock_id = txn_id1;
+    get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, 
initiator);
+
+    // txn1 failed due to calculation timeout and removes the delete bitmap 
lock
+    RemoveDeleteBitmapUpdateLockRequest remove_req;
+    RemoveDeleteBitmapUpdateLockResponse remove_res;
+    remove_req.set_cloud_unique_id("test_cloud_unique_id");
+    remove_req.set_table_id(table_id);
+    remove_req.set_lock_id(lock_id);
+    remove_req.set_initiator(-1);
+    meta_service->remove_delete_bitmap_update_lock(
+            reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&remove_req, &remove_res,
+            nullptr);
+    ASSERT_EQ(remove_res.status().code(), MetaServiceCode::OK);
+
+    // txn2 gains the lock and succeeds to publish with version 101
+    int64_t txn_id2;
+    begin_txn_and_commit_rowset(meta_service.get(), "label32", db_id, 
table_id, t1p1, tablet_id,
+                                &txn_id2);
+    lock_id = txn_id2;
+    get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, 
initiator);
+
+    int64_t txn2_version_to_publish = cur_max_version + 1;
+    UpdateDeleteBitmapRequest update_delete_bitmap_req;
+    UpdateDeleteBitmapResponse update_delete_bitmap_res;
+    std::string data1 = "1234";
+    update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                         table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id2,
+                         txn2_version_to_publish, data1);
+
+    CommitTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    req.set_db_id(db_id);
+    req.set_txn_id(txn_id2);
+    req.add_mow_table_ids(table_id);
+    CommitTxnResponse res;
+    
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                             &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string ver_key = partition_version_key({instance_id, db_id, table_id, 
t1p1});
+    std::string ver_val;
+    VersionPB version_pb;
+    auto ret = txn->get(ver_key, &ver_val);
+    ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
+    ASSERT_TRUE(version_pb.ParseFromString(ver_val));
+    ASSERT_EQ(version_pb.version(), cur_max_version + 1);
+
+    std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
+    std::string lock_val;
+    ret = txn->get(lock_key, &lock_val);
+    ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+    // txn1 retries to publish and gains the lock, try to publish with version 
102
+    lock_id = txn_id1;
+    get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, 
initiator);
+
+    // txn1's previous calculation task finshes and try to update delete 
bitmap with version 101
+    std::string data2 = "5678";
+    update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, 
update_delete_bitmap_res,
+                         table_id, t1p1, lock_id, initiator, tablet_id, 
txn_id1,
+                         txn1_version_to_publish, data2);
+    // this should fail
+    ASSERT_EQ(update_delete_bitmap_res.status().code(), 
MetaServiceCode::VERSION_NOT_MATCH);
+
+    GetDeleteBitmapRequest get_delete_bitmap_req;
+    GetDeleteBitmapResponse get_delete_bitmap_res;
+    get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+    get_delete_bitmap_req.set_tablet_id(tablet_id);
+    get_delete_bitmap_req.add_rowset_ids("123");
+    get_delete_bitmap_req.add_begin_versions(0);
+    get_delete_bitmap_req.add_end_versions(cur_max_version + 1);
+    
meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+                                    &get_delete_bitmap_req, 
&get_delete_bitmap_res, nullptr);
+    ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
+    ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1);
+    ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1);
+    ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1);
+    ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1);
+    ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1);
+}
+
 TEST(MetaServiceTest, UpdateDeleteBitmap) {
     auto meta_service = get_meta_service();
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0add57c3de0..c18b35ce15f 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1394,6 +1394,7 @@ enum MetaServiceCode {
     LOCK_EXPIRED = 8001;
     LOCK_CONFLICT = 8002;
     ROWSETS_EXPIRED = 8003;
+    VERSION_NOT_MATCH = 8004;
 
     // partial update
     ROWSET_META_NOT_FOUND = 9001;
@@ -1419,6 +1420,12 @@ message UpdateDeleteBitmapRequest {
     // Serialized roaring bitmaps indexed with {rowset_id, segment_id, version}
     repeated bytes segment_delete_bitmaps = 10;
     optional bool unlock = 11;
+    // to determine whether this is in an explicit txn and whether it's the 
first sub txn
+    optional bool is_explicit_txn = 12;
+    optional int64 txn_id = 13;
+
+    // for load txn only
+    optional int64 next_visible_version = 14;
 }
 
 message UpdateDeleteBitmapResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to