This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bc3380f6cdc [feat](cloud) Repair tablet indexes versioned write 
(#54772)
bc3380f6cdc is described below

commit bc3380f6cdc0f0ec28996fa049a3e89419cd1a16
Author: walter <[email protected]>
AuthorDate: Fri Aug 15 16:24:53 2025 +0800

    [feat](cloud) Repair tablet indexes versioned write (#54772)
---
 cloud/src/meta-service/meta_service_txn.cpp |  23 ++++-
 cloud/test/txn_lazy_commit_test.cpp         | 144 +++++++++++++++++++++++++++-
 2 files changed, 160 insertions(+), 7 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 83f1bf909df..c0228c95315 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1604,7 +1604,8 @@ void MetaServiceImpl::commit_txn_immediately(
 void repair_tablet_index(
         std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& 
msg,
         const std::string& instance_id, int64_t db_id, int64_t txn_id,
-        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta) {
+        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta,
+        bool is_versioned_write) {
     std::stringstream ss;
     std::vector<std::string> tablet_idx_keys;
     for (auto& [_, i] : tmp_rowsets_meta) {
@@ -1673,9 +1674,24 @@ void repair_tablet_index(
                     return;
                 }
                 txn->put(sub_tablet_idx_keys[j], idx_val);
-                LOG(INFO) << " repaire tablet index txn_id=" << txn_id
+                LOG(INFO) << " repair tablet index txn_id=" << txn_id
                           << " tablet_idx_pb:" << 
tablet_idx_pb.ShortDebugString()
                           << " key=" << hex(sub_tablet_idx_keys[j]);
+                if (is_versioned_write) {
+                    std::string versioned_tablet_idx_key =
+                            versioned::tablet_index_key({instance_id, 
tablet_idx_pb.tablet_id()});
+                    std::string versioned_tablet_inverted_idx_key =
+                            versioned::tablet_inverted_index_key(
+                                    {instance_id, db_id, 
tablet_idx_pb.table_id(),
+                                     tablet_idx_pb.index_id(), 
tablet_idx_pb.partition_id(),
+                                     tablet_idx_pb.tablet_id()});
+                    txn->put(versioned_tablet_idx_key, idx_val);
+                    txn->put(versioned_tablet_inverted_idx_key, "");
+                    LOG(INFO) << "repair tablet index and inverted index, 
txn_id=" << txn_id
+                              << " tablet_id=" << tablet_idx_pb.tablet_id()
+                              << " index_key=" << hex(versioned_tablet_idx_key)
+                              << " inverted_index_key=" << 
hex(versioned_tablet_inverted_idx_key);
+                }
             }
         }
 
@@ -1767,7 +1783,8 @@ void MetaServiceImpl::commit_txn_eventually(
             stats.get_bytes += txn->get_bytes();
             stats.get_counter += txn->num_get_keys();
             txn.reset();
-            repair_tablet_index(txn_kv_, code, msg, instance_id, db_id, 
txn_id, tmp_rowsets_meta);
+            repair_tablet_index(txn_kv_, code, msg, instance_id, db_id, 
txn_id, tmp_rowsets_meta,
+                                is_versioned_write);
             if (code != MetaServiceCode::OK) {
                 LOG(WARNING) << "repair_tablet_index failed, txn_id=" << 
txn_id << " code=" << code;
                 return;
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 0b6b04752d4..cc4da09d575 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -54,7 +54,8 @@ namespace doris::cloud {
 void repair_tablet_index(
         std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& 
msg,
         const std::string& instance_id, int64_t db_id, int64_t txn_id,
-        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta);
+        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta,
+        bool is_versioned_write);
 };
 
 static doris::cloud::RecyclerThreadPoolGroup thread_group;
@@ -344,11 +345,13 @@ static void 
check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id
 }
 
 // Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
-static void create_and_refresh_instance(MetaServiceProxy* service, std::string 
instance_id) {
+static void create_and_refresh_instance(
+        MetaServiceProxy* service, std::string instance_id,
+        MultiVersionStatus multi_version_status = MULTI_VERSION_READ_WRITE) {
     // write instance
     InstanceInfoPB instance_info;
     instance_info.set_instance_id(instance_id);
-    instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+    instance_info.set_multi_version_status(multi_version_status);
     std::unique_ptr<Transaction> txn;
     ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->put(instance_key(instance_id), instance_info.SerializeAsString());
@@ -458,7 +461,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
 
     MetaServiceCode code = MetaServiceCode::UNDEFINED_ERR;
     std::string msg;
-    repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id, 
tmp_rowsets_meta);
+    repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id, 
tmp_rowsets_meta, false);
     ASSERT_EQ(code, MetaServiceCode::OK);
 
     {
@@ -866,6 +869,139 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) 
{
     sp->disable_processing();
 }
 
+TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) {
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 3131397513;
+    int64_t table_id = 3213867;
+    int64_t index_id = 123513;
+    int64_t partition_id = 113123;
+    bool commit_txn_eventually_finish_hit = false;
+    bool last_pending_txn_id_hit = false;
+    int repair_tablet_idx_count = 0;
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", 
[&](auto&& args) {
+        bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]);
+        if (repair_tablet_idx_count == 0) {
+            ASSERT_TRUE(need_repair_tablet_idx);
+            repair_tablet_idx_count++;
+        } else {
+            ASSERT_FALSE(need_repair_tablet_idx);
+        }
+    });
+
+    sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& 
args) {
+        int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+        ASSERT_EQ(last_pending_txn_id, 0);
+        last_pending_txn_id_hit = true;
+    });
+
+    sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+        MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        commit_txn_eventually_finish_hit = true;
+    });
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, false);
+    std::string instance_id = "test_instance";
+    std::string cloud_unique_id = "1:test_instance:1";
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& 
args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    SyncPoint::get_instance()->enable_processing();
+    create_and_refresh_instance(meta_service.get(), instance_id, 
MULTI_VERSION_WRITE_ONLY);
+
+    int64_t txn_id = 0;
+    {
+        brpc::Controller cntl;
+        BeginTxnRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label("test_label_commit_txn_eventually");
+        txn_info_pb.add_table_ids(table_id);
+        txn_info_pb.set_timeout_ms(36000);
+        req.mutable_txn_info()->CopyFrom(txn_info_pb);
+        BeginTxnResponse res;
+        meta_service->begin_txn(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        txn_id = res.txn_id();
+    }
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 1103;
+    for (int i = 0; i < 5; ++i) {
+        create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
+                                    tablet_id_base + i);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 5; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tmp_rowset_exist(txn, tablet_id, txn_id);
+        }
+    }
+
+    {
+        brpc::Controller cntl;
+        CommitTxnRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_db_id(db_id);
+        req.set_txn_id(txn_id);
+        req.set_is_2pc(false);
+        req.set_enable_txn_lazy_commit(true);
+        for (int i = 0; i < 5; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            req.add_base_tablet_ids(tablet_id);
+        }
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_GE(repair_tablet_idx_count, 0);
+        ASSERT_TRUE(last_pending_txn_id_hit);
+        ASSERT_TRUE(commit_txn_eventually_finish_hit);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 5; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+
+            {
+                std::string mock_instance = "test_instance";
+                std::string key = versioned::tablet_index_key({mock_instance, 
tablet_id});
+                std::string val;
+                ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+                TabletIndexPB tablet_idx_pb;
+                tablet_idx_pb.ParseFromString(val);
+                ASSERT_EQ(tablet_idx_pb.db_id(), db_id);
+            }
+        }
+    }
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+
 TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
     auto txn_kv = get_mem_txn_kv();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to