This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0983526722a branch-3.0-pick: [Fix](cloud-mow) Check partition's version to avoid wrongly update visible versions' delete bitmaps (#49710) (#49796) 0983526722a is described below commit 0983526722aaca0817a765433e2440d0fc39230a Author: bobhan1 <bao...@selectdb.com> AuthorDate: Mon Apr 7 12:04:28 2025 +0800 branch-3.0-pick: [Fix](cloud-mow) Check partition's version to avoid wrongly update visible versions' delete bitmaps (#49710) (#49796) pick https://github.com/apache/doris/pull/49710 --- .../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 3 +- be/src/cloud/cloud_meta_mgr.cpp | 11 +- be/src/cloud/cloud_meta_mgr.h | 3 +- be/src/cloud/cloud_tablet.cpp | 12 +- be/src/cloud/cloud_tablet.h | 5 +- be/src/olap/base_tablet.cpp | 3 +- be/src/olap/base_tablet.h | 4 +- be/src/olap/tablet.cpp | 3 +- be/src/olap/tablet.h | 3 +- cloud/src/meta-service/meta_service.cpp | 114 +++++- cloud/src/meta-service/meta_service_txn.cpp | 1 + cloud/test/meta_service_test.cpp | 384 ++++++++++++++++++++- gensrc/proto/cloud.proto | 7 + 13 files changed, 536 insertions(+), 17 deletions(-) diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 86b369b3db1..39c0575c8b1 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -259,7 +259,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { // we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps, // because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns - RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap)); + RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap, + _version)); LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id << ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps."; diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index c92d5e9404e..6f6024f1912 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1141,7 +1141,9 @@ Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema& } Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, - int64_t initiator, DeleteBitmap* delete_bitmap) { + int64_t initiator, DeleteBitmap* delete_bitmap, + int64_t txn_id, bool is_explicit_txn, + int64_t next_visible_version) { VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id(); UpdateDeleteBitmapRequest req; UpdateDeleteBitmapResponse res; @@ -1151,6 +1153,13 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc req.set_tablet_id(tablet.tablet_id()); req.set_lock_id(lock_id); req.set_initiator(initiator); + req.set_is_explicit_txn(is_explicit_txn); + if (txn_id > 0) { + req.set_txn_id(txn_id); + } + if (next_visible_version > 0) { + req.set_next_visible_version(next_visible_version); + } for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { req.add_rowset_ids(std::get<0>(key).to_string()); req.add_segment_ids(std::get<1>(key)); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index d06e55e69ad..a666a5e4d16 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -99,7 +99,8 @@ public: Status update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema); Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, int64_t initiator, - DeleteBitmap* delete_bitmap); + DeleteBitmap* delete_bitmap, int64_t txn_id = -1, + bool is_explicit_txn = false, int64_t next_visible_version = -1); Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet, DeleteBitmap* delete_bitmap); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index a1026c9518d..cf3fe051bec 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -699,7 +699,8 @@ CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() { Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, - const RowsetIdUnorderedSet& cur_rowset_ids) { + const RowsetIdUnorderedSet& cur_rowset_ids, + int64_t next_visible_version) { RowsetSharedPtr rowset = txn_info->rowset; int64_t cur_version = rowset->start_version(); // update delete bitmap info, in order to avoid recalculation when trying again @@ -715,7 +716,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta)); } - RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap)); + RETURN_IF_ERROR( + save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, next_visible_version)); // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do @@ -745,7 +747,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx } Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, - DeleteBitmapPtr delete_bitmap) { + DeleteBitmapPtr delete_bitmap, + int64_t next_visible_version) { DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { @@ -758,7 +761,8 @@ Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id } RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID, - new_delete_bitmap.get())); + new_delete_bitmap.get(), txn_id, false, + next_visible_version)); return Status::OK(); } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index b2d6e8921b0..4226e26a0f5 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -170,10 +170,11 @@ public: Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, - const RowsetIdUnorderedSet& cur_rowset_ids) override; + const RowsetIdUnorderedSet& cur_rowset_ids, + int64_t next_visible_version = -1) override; Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, - DeleteBitmapPtr delete_bitmap); + DeleteBitmapPtr delete_bitmap, int64_t next_visible_version); Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset, diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 4250a9f09b8..c33043b3b64 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1352,7 +1352,8 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); auto t5 = watch.get_elapse_time_us(); RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap, - transient_rs_writer.get(), cur_rowset_ids)); + transient_rs_writer.get(), cur_rowset_ids, + cur_version)); // defensive check, check that the delete bitmap cache we wrote is correct RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id, delete_bitmap.get())); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 693f08dcac7..40928e63729 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -228,10 +228,10 @@ public: static Status update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, int64_t txn_id, int64_t txn_expiration = 0); - virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, - const RowsetIdUnorderedSet& cur_rowset_ids) = 0; + const RowsetIdUnorderedSet& cur_rowset_ids, + int64_t next_visible_version = -1) = 0; virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0; void calc_compaction_output_rowset_delete_bitmap( diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 78cb6f23844..a1011661fc0 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2483,7 +2483,8 @@ CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() { Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, - const RowsetIdUnorderedSet& cur_rowset_ids) { + const RowsetIdUnorderedSet& cur_rowset_ids, + int64_t next_visible_version) { RowsetSharedPtr rowset = txn_info->rowset; int64_t cur_version = rowset->start_version(); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 96bc5d87e3c..ff18ce50657 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -417,7 +417,8 @@ public: CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override; Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, - const RowsetIdUnorderedSet& cur_rowset_ids) override; + const RowsetIdUnorderedSet& cur_rowset_ids, + int64_t next_visible_version = -1) override; void merge_delete_bitmap(const DeleteBitmap& delete_bitmap); bool check_all_rowset_segment(); diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 6ceee180939..78daf880c54 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1829,6 +1829,106 @@ static bool process_pending_delete_bitmap(MetaServiceCode& code, std::string& ms return true; } +// When a load txn retries in publish phase with different version to publish, it will gain delete bitmap lock +// many times. these locks are *different*, but they are the same in the current implementation because they have +// the same lock_id and initiator and don't have version info. If some delete bitmap calculation task with version X +// on BE lasts long and try to update delete bitmaps on MS when the txn gains the lock in later retries +// with version Y(Y > X) to publish. It may wrongly update version X's delete bitmaps because the lock don't have version info. +// +// This function checks whether the partition version is correct when updating the delete bitmap +// to avoid wrongly update an visible version's delete bitmaps. +// 1. get the db id with txn id +// 2. get the partition version with db id, table id and partition id +// 3. check if the partition version matches the updating version +static bool check_partition_version_when_update_delete_bitmap( + MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>& txn, + std::string& instance_id, int64_t table_id, int64_t partition_id, int64_t tablet_id, + int64_t txn_id, int64_t next_visible_version) { + if (partition_id <= 0) { + LOG(WARNING) << fmt::format( + "invalid partition_id, skip to check partition version. txn={}, " + "table_id={}, partition_id={}, tablet_id={}", + txn_id, table_id, partition_id, tablet_id); + return true; + } + // Get db id with txn id + std::string index_val; + const std::string index_key = txn_index_key({instance_id, txn_id}); + auto err = txn->get(index_key, &index_val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err); + LOG(WARNING) << msg; + return false; + } + + TxnIndexPB index_pb; + if (!index_pb.ParseFromString(index_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id); + LOG(WARNING) << msg; + return false; + } + + DCHECK(index_pb.has_tablet_index()) + << fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}", + txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); + DCHECK(index_pb.tablet_index().has_db_id()) + << fmt::format("txn={}, table_id={}, partition_id={}, tablet_id={}, index_pb={}", + txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); + if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) { + LOG(WARNING) << fmt::format( + "has no db_id in TxnIndexPB, skip to check partition version. txn={}, " + "table_id={}, partition_id={}, tablet_id={}, index_pb={}", + txn_id, table_id, partition_id, tablet_id, proto_to_json(index_pb)); + return true; + } + int64_t db_id = index_pb.tablet_index().db_id(); + + std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); + std::string ver_val; + err = txn->get(ver_key, &ver_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get partition version, txn_id={}, tablet={}, err={}", txn_id, + tablet_id, err); + LOG(WARNING) << msg; + return false; + } + + int64_t cur_max_version {-1}; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + cur_max_version = 1; + } else { + VersionPB version_pb; + if (!version_pb.ParseFromString(ver_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("failed to parse version_pb, txn_id={}, tablet={}, key={}", txn_id, + tablet_id, hex(ver_key)); + LOG(WARNING) << msg; + return false; + } + DCHECK(version_pb.has_version()); + cur_max_version = version_pb.version(); + + if (version_pb.pending_txn_ids_size() > 0) { + DCHECK(version_pb.pending_txn_ids_size() == 1); + cur_max_version += version_pb.pending_txn_ids_size(); + } + } + + if (cur_max_version + 1 != next_visible_version) { + code = MetaServiceCode::VERSION_NOT_MATCH; + msg = fmt::format( + "check version failed when update_delete_bitmap, txn={}, table_id={}, " + "partition_id={}, tablet_id={}, found partition's max version is {}, but " + "request next_visible_version is {}", + txn_id, table_id, partition_id, tablet_id, cur_max_version, next_visible_version); + return false; + } + return true; +} + void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* controller, const UpdateDeleteBitmapRequest* request, UpdateDeleteBitmapResponse* response, @@ -1880,7 +1980,17 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont } } - // 3. store all pending delete bitmap for this txn + // 3. check if partition's version matches + if (request->lock_id() > 0 && request->has_txn_id() && request->has_partition_id() && + request->has_next_visible_version()) { + if (!check_partition_version_when_update_delete_bitmap( + code, msg, txn, instance_id, table_id, request->partition_id(), tablet_id, + request->txn_id(), request->next_visible_version())) { + return; + } + } + + // 4. store all pending delete bitmap for this txn PendingDeleteBitmapPB delete_bitmap_keys; for (size_t i = 0; i < request->rowset_ids_size(); ++i) { MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i), @@ -1919,7 +2029,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont } } - // 4. Update delete bitmap for curent txn + // 5. Update delete bitmap for curent txn size_t current_key_count = 0; size_t current_value_count = 0; size_t total_key_count = 0; diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 8bec142e13c..b251ec64ed0 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -3163,6 +3163,7 @@ void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle const std::string index_key = txn_index_key({instance_id, sub_txn_id}); std::string index_val; TxnIndexPB index_pb; + index_pb.mutable_tablet_index()->set_db_id(db_id); if (!index_pb.SerializeToString(&index_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; ss << "failed to serialize txn_index_pb " diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 10a5b3c6f18..0a1be69e1eb 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -154,6 +154,18 @@ static void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int6 ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; } +static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_id, + int64_t table_id, int64_t index_id, int64_t partition_id, + int64_t tablet_id) { + brpc::Controller cntl; + CreateTabletsRequest req; + CreateTabletsResponse res; + req.set_db_id(db_id); + add_tablet(req, table_id, index_id, partition_id, tablet_id); + meta_service->create_tablets(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; +} + static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, int64_t table_id, int64_t& txn_id) { brpc::Controller cntl; @@ -2050,7 +2062,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) { index_key = txn_index_key({mock_instance, sub_txn_id3}); ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK); txn_index.ParseFromString(index_val); - ASSERT_FALSE(txn_index.has_tablet_index()); + ASSERT_TRUE(txn_index.has_tablet_index()); // txn_label std::string label_key = txn_label_key({mock_instance, db_id, label}); @@ -4923,6 +4935,376 @@ TEST(MetaServiceTest, UpdateDeleteBitmapWithBigKeys) { ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); } +static void set_partition_version(MetaServiceProxy* meta_service, std::string_view instance_id, + int64_t db_id, int64_t table_id, int64_t partition_id, + int64_t version, std::vector<int64_t> pending_txn_ids = {}) { + std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); + std::string ver_val; + VersionPB version_pb; + version_pb.set_version(version); + if (!pending_txn_ids.empty()) { + for (auto txn_id : pending_txn_ids) { + version_pb.add_pending_txn_ids(txn_id); + } + } + ASSERT_TRUE(version_pb.SerializeToString(&ver_val)); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(ver_key, ver_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +static void begin_txn_and_commit_rowset(MetaServiceProxy* meta_service, const std::string& label, + int64_t db_id, int64_t table_id, int64_t partition_id, + int64_t tablet_id, int64_t* txn_id) { + begin_txn(meta_service, db_id, label, table_id, *txn_id); + CreateRowsetResponse res; + auto rowset = create_rowset(*txn_id, tablet_id, partition_id); + prepare_rowset(meta_service, rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); + commit_rowset(meta_service, rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); +} + +static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service, int64_t table_id, + int64_t partition_id, int64_t lock_id, + int64_t initiator) { + brpc::Controller cntl; + GetDeleteBitmapUpdateLockRequest get_lock_req; + GetDeleteBitmapUpdateLockResponse get_lock_res; + get_lock_req.set_cloud_unique_id("test_cloud_unique_id"); + get_lock_req.set_table_id(table_id); + get_lock_req.add_partition_ids(partition_id); + get_lock_req.set_expiration(5); + get_lock_req.set_lock_id(lock_id); + get_lock_req.set_initiator(initiator); + meta_service->get_delete_bitmap_update_lock( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req, + &get_lock_res, nullptr); + ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK); +} + +static void update_delete_bitmap(MetaServiceProxy* meta_service, + UpdateDeleteBitmapRequest& update_delete_bitmap_req, + UpdateDeleteBitmapResponse& update_delete_bitmap_res, + int64_t table_id, int64_t partition_id, int64_t lock_id, + int64_t initiator, int64_t tablet_id, int64_t txn_id, + int64_t next_visible_version, std::string data = "1111") { + brpc::Controller cntl; + update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); + update_delete_bitmap_req.set_table_id(table_id); + update_delete_bitmap_req.set_partition_id(partition_id); + update_delete_bitmap_req.set_lock_id(lock_id); + update_delete_bitmap_req.set_initiator(initiator); + update_delete_bitmap_req.set_tablet_id(tablet_id); + update_delete_bitmap_req.set_txn_id(txn_id); + update_delete_bitmap_req.set_next_visible_version(next_visible_version); + update_delete_bitmap_req.add_rowset_ids("123"); + update_delete_bitmap_req.add_segment_ids(0); + update_delete_bitmap_req.add_versions(next_visible_version); + update_delete_bitmap_req.add_segment_delete_bitmaps(data); + meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl), + &update_delete_bitmap_req, &update_delete_bitmap_res, + nullptr); +} + +TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersion) { + auto meta_service = get_meta_service(); + brpc::Controller cntl; + + extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr, + const std::string& cloud_unique_id); + auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id"); + + { + // 1. normal path + // 1.1 has partition version and request version matches + int64_t db_id = 999; + int64_t table_id = 1001; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t cur_max_version = 100; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + set_partition_version(meta_service.get(), instance_id, db_id, table_id, t1p1, + cur_max_version); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, + cur_max_version + 1); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); + } + + { + // 1. normal path + // 1.2 does not have partition version KV and request version matches + int64_t db_id = 999; + int64_t table_id = 1002; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label12", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, 2); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); + } + + { + // 1. normal path + // 1.3 has partition version and pending txn, and request version matches + int64_t db_id = 999; + int64_t table_id = 1003; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t cur_max_version = 120; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label13", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + set_partition_version(meta_service.get(), instance_id, db_id, table_id, t1p1, + cur_max_version, {12345}); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, + cur_max_version + 2); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); + } +} + +TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersionFail) { + auto meta_service = get_meta_service(); + brpc::Controller cntl; + + extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr, + const std::string& cloud_unique_id); + auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id"); + + { + // 2. abnormal path + // 2.1 has partition version but request version does not match + int64_t db_id = 999; + int64_t table_id = 2001; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t cur_max_version = 100; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label21", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + set_partition_version(meta_service.get(), instance_id, db_id, table_id, t1p1, + cur_max_version); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + // wrong version + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, + cur_max_version + 2); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::VERSION_NOT_MATCH); + } + + { + // 2. abnormal path + // 2.2 does not have partition version KV and request version does not match + int64_t db_id = 999; + int64_t table_id = 2002; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label22", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + // first load, wrong version + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, 10); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::VERSION_NOT_MATCH); + } + + { + // 2. abnormal path + // 2.3 has partition version and pending txn, and request version matches + int64_t db_id = 999; + int64_t table_id = 2003; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t cur_max_version = 120; + int64_t txn_id; + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, + index_id, t1p1, tablet_id)); + begin_txn_and_commit_rowset(meta_service.get(), "label23", db_id, table_id, t1p1, tablet_id, + &txn_id); + int64_t lock_id = txn_id; + + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + set_partition_version(meta_service.get(), instance_id, db_id, table_id, t1p1, + cur_max_version, {12345}); + + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + // wrong version + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id, + cur_max_version + 1); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::VERSION_NOT_MATCH); + } +} + +TEST(MetaServiceTest, UpdateDeleteBitmapFailCase) { + // simulate the situation described in https://github.com/apache/doris/pull/49710 + auto meta_service = get_meta_service(); + brpc::Controller cntl; + extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr, + const std::string& cloud_unique_id); + auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id"); + + int64_t db_id = 1999; + int64_t table_id = 1001; + int64_t index_id = 4001; + int64_t t1p1 = 2001; + int64_t tablet_id = 3001; + int64_t initiator = -1; + int64_t cur_max_version = 100; + set_partition_version(meta_service.get(), instance_id, db_id, table_id, t1p1, cur_max_version); + ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, + t1p1, tablet_id)); + + // txn1 begins + int64_t txn_id1; + begin_txn_and_commit_rowset(meta_service.get(), "label31", db_id, table_id, t1p1, tablet_id, + &txn_id1); + int64_t txn1_version_to_publish = cur_max_version + 1; + // txn1 gains the lock and try to publish with version 101 + int64_t lock_id = txn_id1; + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + + // txn1 failed due to calculation timeout and removes the delete bitmap lock + RemoveDeleteBitmapUpdateLockRequest remove_req; + RemoveDeleteBitmapUpdateLockResponse remove_res; + remove_req.set_cloud_unique_id("test_cloud_unique_id"); + remove_req.set_table_id(table_id); + remove_req.set_lock_id(lock_id); + remove_req.set_initiator(-1); + meta_service->remove_delete_bitmap_update_lock( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_req, &remove_res, + nullptr); + ASSERT_EQ(remove_res.status().code(), MetaServiceCode::OK); + + // txn2 gains the lock and succeeds to publish with version 101 + int64_t txn_id2; + begin_txn_and_commit_rowset(meta_service.get(), "label32", db_id, table_id, t1p1, tablet_id, + &txn_id2); + lock_id = txn_id2; + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + + int64_t txn2_version_to_publish = cur_max_version + 1; + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + std::string data1 = "1234"; + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id2, + txn2_version_to_publish, data1); + + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id2); + req.add_mow_table_ids(table_id); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string ver_key = partition_version_key({instance_id, db_id, table_id, t1p1}); + std::string ver_val; + VersionPB version_pb; + auto ret = txn->get(ver_key, &ver_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + ASSERT_TRUE(version_pb.ParseFromString(ver_val)); + ASSERT_EQ(version_pb.version(), cur_max_version + 1); + + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + std::string lock_val; + ret = txn->get(lock_key, &lock_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND); + + // txn1 retries to publish and gains the lock, try to publish with version 102 + lock_id = txn_id1; + get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); + + // txn1's previous calculation task finshes and try to update delete bitmap with version 101 + std::string data2 = "5678"; + update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, + table_id, t1p1, lock_id, initiator, tablet_id, txn_id1, + txn1_version_to_publish, data2); + // this should fail + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::VERSION_NOT_MATCH); + + GetDeleteBitmapRequest get_delete_bitmap_req; + GetDeleteBitmapResponse get_delete_bitmap_res; + get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); + get_delete_bitmap_req.set_tablet_id(tablet_id); + get_delete_bitmap_req.add_rowset_ids("123"); + get_delete_bitmap_req.add_begin_versions(0); + get_delete_bitmap_req.add_end_versions(cur_max_version + 1); + meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl), + &get_delete_bitmap_req, &get_delete_bitmap_res, nullptr); + ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1); + ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1); + ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1); + ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1); + ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1); +} + TEST(MetaServiceTest, UpdateDeleteBitmap) { auto meta_service = get_meta_service(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 0add57c3de0..c18b35ce15f 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1394,6 +1394,7 @@ enum MetaServiceCode { LOCK_EXPIRED = 8001; LOCK_CONFLICT = 8002; ROWSETS_EXPIRED = 8003; + VERSION_NOT_MATCH = 8004; // partial update ROWSET_META_NOT_FOUND = 9001; @@ -1419,6 +1420,12 @@ message UpdateDeleteBitmapRequest { // Serialized roaring bitmaps indexed with {rowset_id, segment_id, version} repeated bytes segment_delete_bitmaps = 10; optional bool unlock = 11; + // to determine whether this is in an explicit txn and whether it's the first sub txn + optional bool is_explicit_txn = 12; + optional int64 txn_id = 13; + + // for load txn only + optional int64 next_visible_version = 14; } message UpdateDeleteBitmapResponse { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org