This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 734f09060a0 branch-3.0: [fix](cloud) commit txn with sub txns should 
consider lazy txn commiter (#54653) (#55351)
734f09060a0 is described below

commit 734f09060a024fddfb80c51155271bfef3582f85
Author: meiyi <[email protected]>
AuthorDate: Thu Aug 28 17:47:38 2025 +0800

    branch-3.0: [fix](cloud) commit txn with sub txns should consider lazy txn 
commiter (#54653) (#55351)
    
    pick https://github.com/apache/doris/pull/54653
---
 cloud/src/meta-service/meta_service_txn.cpp   | 881 ++++++++++++++------------
 cloud/src/meta-service/txn_lazy_committer.cpp |   1 +
 cloud/test/txn_lazy_commit_test.cpp           | 279 ++++++++
 3 files changed, 745 insertions(+), 416 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 2f3f0d15545..a800b41f83e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2005,11 +2005,14 @@ void commit_txn_eventually(
  *    t2: t2_p3(4), t2_p4(4)
  */
 void commit_txn_with_sub_txn(const CommitTxnRequest* request, 
CommitTxnResponse* response,
-                             std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& 
code,
-                             std::string& msg, const std::string& instance_id, 
KVStats& stats) {
+                             std::shared_ptr<TxnKv>& txn_kv,
+                             std::shared_ptr<TxnLazyCommitter>& 
txn_lazy_committer,
+                             MetaServiceCode& code, std::string& msg,
+                             const std::string& instance_id, KVStats& stats) {
     std::stringstream ss;
     int64_t txn_id = request->txn_id();
     auto sub_txn_infos = request->sub_txn_infos();
+
     // Create a readonly txn for scan tmp rowset
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv->create_txn(&txn);
@@ -2020,15 +2023,6 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* 
request, CommitTxnResponse*
         LOG(WARNING) << msg;
         return;
     }
-    DORIS_CLOUD_DEFER {
-        if (txn == nullptr) return;
-        stats.get_bytes += txn->get_bytes();
-        stats.put_bytes += txn->put_bytes();
-        stats.del_bytes += txn->delete_bytes();
-        stats.get_counter += txn->num_get_keys();
-        stats.put_counter += txn->num_put_keys();
-        stats.del_counter += txn->num_del_keys();
-    };
 
     // Get db id with txn id
     std::string index_val;
@@ -2122,468 +2116,522 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* 
request, CommitTxnResponse*
     }
     stats.get_bytes += txn->get_bytes();
     stats.get_counter += txn->num_get_keys();
-    // Create a read/write txn for guarantee consistency
     txn.reset();
-    err = txn_kv->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::CREATE>(err);
-        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
-
-    // Get txn info with db_id and txn_id
-    std::string info_val; // Will be reused when saving updated txn
-    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
-    err = txn->get(info_key, &info_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                      : 
cast_as<ErrCategory::READ>(err);
-        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
-
-    TxnInfoPB txn_info;
-    if (!txn_info.ParseFromString(info_val)) {
-        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
 
-    // TODO: do more check like txn state
-    DCHECK(txn_info.txn_id() == txn_id);
-    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
-        code = MetaServiceCode::TXN_ALREADY_ABORTED;
-        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
-
-    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
-        code = MetaServiceCode::OK;
-        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" 
<< txn_id;
-        msg = ss.str();
-        LOG(INFO) << msg;
-        response->mutable_txn_info()->CopyFrom(txn_info);
-        return;
-    }
-
-    LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+    do {
+        TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
+        // Create a read/write txn for guarantee consistency
+        err = txn_kv->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+        DORIS_CLOUD_DEFER {
+            if (txn == nullptr) return;
+            stats.get_bytes += txn->get_bytes();
+            stats.put_bytes += txn->put_bytes();
+            stats.del_bytes += txn->delete_bytes();
+            stats.get_counter += txn->num_get_keys();
+            stats.put_counter += txn->num_put_keys();
+            stats.del_counter += txn->num_del_keys();
+        };
 
-    // Prepare rowset meta and new_versions
-    // Read tablet indexes in batch.
-    std::map<int64_t, int64_t> tablet_id_to_idx;
-    std::vector<std::string> tablet_idx_keys;
-    std::vector<int64_t> partition_ids;
-    auto idx = 0;
-    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
-        for (auto& [_, i] : tmp_rowsets_meta) {
-            auto tablet_id = i.tablet_id();
-            if (tablet_id_to_idx.count(tablet_id) == 0) {
-                tablet_id_to_idx.emplace(tablet_id, idx);
-                tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
-                partition_ids.push_back(i.partition_id());
-                idx++;
+        // Get txn info with db_id and txn_id
+        std::string info_val; // Will be reused when saving updated txn
+        const std::string info_key = txn_info_key({instance_id, db_id, 
txn_id});
+        err = txn->get(info_key, &info_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                ss << "transaction [" << txn_id << "] not found, db_id=" << 
db_id;
+            } else {
+                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" 
<< txn_id
+                   << " err=" << err;
             }
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
         }
-    }
-    std::vector<std::optional<std::string>> tablet_idx_values;
-    err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, 
Transaction::BatchGetOptions(false));
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::READ>(err);
-        ss << "failed to get tablet table index ids, err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg << " txn_id=" << txn_id;
-        return;
-    }
 
-    // tablet_id -> {table/index/partition}_id
-    std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
-    // table_id -> tablets_ids
-    std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
-    for (auto [tablet_id, i] : tablet_id_to_idx) {
-        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
-            // The value must existed
-            code = MetaServiceCode::KV_TXN_GET_ERR;
-            ss << "failed to get tablet table index ids, err=not found"
-               << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
+        TxnInfoPB txn_info;
+        if (!txn_info.ParseFromString(info_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
             msg = ss.str();
-            LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+            LOG(WARNING) << msg;
             return;
         }
-        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
-            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
+
+        // TODO: do more check like txn state
+        DCHECK(txn_info.txn_id() == txn_id);
+        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+            code = MetaServiceCode::TXN_ALREADY_ABORTED;
+            ss << "transaction is already aborted: db_id=" << db_id << " 
txn_id=" << txn_id;
             msg = ss.str();
             LOG(WARNING) << msg;
             return;
         }
-        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
-        VLOG_DEBUG << "tablet_id:" << tablet_id
-                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
-    }
 
-    tablet_idx_keys.clear();
-    tablet_idx_values.clear();
+        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+            code = MetaServiceCode::OK;
+            ss << "transaction is already visible: db_id=" << db_id << " 
txn_id=" << txn_id;
+            msg = ss.str();
+            LOG(INFO) << msg;
+            response->mutable_txn_info()->CopyFrom(txn_info);
+            return;
+        }
 
-    // {table/partition} -> version
-    std::unordered_map<std::string, uint64_t> new_versions;
-    std::vector<std::string> version_keys;
-    for (auto& [tablet_id, i] : tablet_id_to_idx) {
-        int64_t table_id = tablet_ids[tablet_id].table_id();
-        int64_t partition_id = partition_ids[i];
-        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
-        if (new_versions.count(ver_key) == 0) {
-            new_versions.insert({ver_key, 0});
-            LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << 
" txn_id=" << txn_id
-                      << ", db_id=" << db_id << ", table_id=" << table_id
-                      << ", partition_id=" << partition_id;
-            version_keys.push_back(std::move(ver_key));
-        }
-    }
-    std::vector<std::optional<std::string>> version_values;
-    err = txn->batch_get(&version_values, version_keys);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::READ>(err);
-        ss << "failed to get partition versions, err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg << " txn_id=" << txn_id;
-        return;
-    }
-    size_t total_versions = version_keys.size();
-    for (size_t i = 0; i < total_versions; i++) {
-        int64_t version;
-        if (version_values[i].has_value()) {
-            VersionPB version_pb;
-            if (!version_pb.ParseFromString(version_values[i].value())) {
+        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+
+        // Prepare rowset meta and new_versions
+        // Read tablet indexes in batch.
+        std::map<int64_t, int64_t> tablet_id_to_idx;
+        std::vector<std::string> tablet_idx_keys;
+        std::vector<int64_t> partition_ids;
+        auto idx = 0;
+        for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+            for (auto& [_, i] : tmp_rowsets_meta) {
+                auto tablet_id = i.tablet_id();
+                if (tablet_id_to_idx.count(tablet_id) == 0) {
+                    tablet_id_to_idx.emplace(tablet_id, idx);
+                    
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
+                    partition_ids.push_back(i.partition_id());
+                    idx++;
+                }
+            }
+        }
+        std::vector<std::optional<std::string>> tablet_idx_values;
+        err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
+                             Transaction::BatchGetOptions(false));
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get tablet table index ids, err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+
+        // tablet_id -> {table/index/partition}_id
+        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+        // table_id -> tablets_ids
+        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+        for (auto [tablet_id, i] : tablet_id_to_idx) {
+            if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+                // The value must existed
+                code = MetaServiceCode::KV_TXN_GET_ERR;
+                ss << "failed to get tablet table index ids, err=not found"
+                   << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
+                msg = ss.str();
+                LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+                return;
+            }
+            if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse version pb txn_id=" << txn_id
-                   << " key=" << hex(version_keys[i]);
+                ss << "malformed tablet index value tablet_id=" << tablet_id
+                   << " txn_id=" << txn_id;
                 msg = ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
-            version = version_pb.version();
-        } else {
-            version = 1;
+            
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+            VLOG_DEBUG << "tablet_id:" << tablet_id
+                       << " value:" << 
tablet_ids[tablet_id].ShortDebugString();
         }
-        new_versions[version_keys[i]] = version;
-        LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
-                  << " version:" << version << " txn_id=" << txn_id;
-    }
-    version_keys.clear();
-    version_values.clear();
 
-    std::vector<std::pair<std::string, std::string>> rowsets;
-    std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
-    for (const auto& sub_txn_info : sub_txn_infos) {
-        auto sub_txn_id = sub_txn_info.sub_txn_id();
-        auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
-        std::unordered_map<int64_t, int64_t> partition_id_to_version;
-        for (auto& [_, i] : tmp_rowsets_meta) {
-            int64_t tablet_id = i.tablet_id();
+        tablet_idx_keys.clear();
+        tablet_idx_values.clear();
+
+        // {table/partition} -> version
+        std::unordered_map<std::string, uint64_t> new_versions;
+        std::vector<std::string> version_keys;
+        for (auto& [tablet_id, i] : tablet_id_to_idx) {
             int64_t table_id = tablet_ids[tablet_id].table_id();
-            int64_t partition_id = i.partition_id();
+            int64_t partition_id = partition_ids[i];
             std::string ver_key =
                     partition_version_key({instance_id, db_id, table_id, 
partition_id});
-            if (new_versions.count(ver_key) == 0) [[unlikely]] {
-                // it is impossible.
-                code = MetaServiceCode::UNDEFINED_ERR;
-                ss << "failed to get partition version key, the target version 
not exists in "
-                      "new_versions."
-                   << " txn_id=" << txn_id << ", db_id=" << db_id << ", 
table_id=" << table_id
-                   << ", partition_id=" << partition_id;
-                msg = ss.str();
-                LOG(ERROR) << msg;
-                return;
+            if (new_versions.count(ver_key) == 0) {
+                new_versions.insert({ver_key, 0});
+                LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key)
+                          << " txn_id=" << txn_id << ", db_id=" << db_id
+                          << ", table_id=" << table_id << ", partition_id=" << 
partition_id;
+                version_keys.push_back(std::move(ver_key));
             }
-
-            // Update rowset version
-            int64_t new_version = new_versions[ver_key];
-            if (partition_id_to_version.count(partition_id) == 0) {
-                new_versions[ver_key] = new_version + 1;
-                new_version = new_versions[ver_key];
-                partition_id_to_version[partition_id] = new_version;
+        }
+        std::vector<std::optional<std::string>> version_values;
+        err = txn->batch_get(&version_values, version_keys);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get partition versions, err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+        size_t total_versions = version_keys.size();
+        int64_t last_pending_txn_id = 0;
+        for (size_t i = 0; i < total_versions; i++) {
+            int64_t version;
+            if (version_values[i].has_value()) {
+                VersionPB version_pb;
+                if (!version_pb.ParseFromString(version_values[i].value())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "failed to parse version pb txn_id=" << txn_id
+                       << " key=" << hex(version_keys[i]);
+                    msg = ss.str();
+                    return;
+                }
+                if (version_pb.pending_txn_ids_size() > 0) {
+                    DCHECK(version_pb.pending_txn_ids_size() == 1);
+                    last_pending_txn_id = version_pb.pending_txn_ids(0);
+                    DCHECK(last_pending_txn_id > 0);
+                    break;
+                }
+                version = version_pb.version();
+            } else {
+                version = 1;
             }
-            i.set_start_version(new_version);
-            i.set_end_version(new_version);
-            LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
-                      << ", sub_txn_id=" << sub_txn_id << ", table_id=" << 
table_id
-                      << ", partition_id=" << partition_id << ", tablet_id=" 
<< tablet_id
-                      << ", new_version=" << new_version;
+            new_versions[version_keys[i]] = version;
+            last_pending_txn_id = 0;
+            LOG(INFO) << "xxx get partition_version_key=" << 
hex(version_keys[i])
+                      << " version:" << version << " txn_id=" << txn_id;
+        }
+        version_keys.clear();
+        version_values.clear();
 
-            std::string key = meta_rowset_key({instance_id, tablet_id, 
i.end_version()});
-            std::string val;
-            if (!i.SerializeToString(&val)) {
-                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
-                msg = ss.str();
+        if (last_pending_txn_id > 0) {
+            stats.get_bytes += txn->get_bytes();
+            stats.get_counter += txn->num_get_keys();
+            txn.reset();
+            
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::advance_last_pending_txn_id",
+                                     &last_pending_txn_id);
+            std::shared_ptr<TxnLazyCommitTask> task =
+                    txn_lazy_committer->submit(instance_id, 
last_pending_txn_id);
+
+            std::tie(code, msg) = task->wait();
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "advance_last_txn failed last_txn=" << 
last_pending_txn_id
+                             << " code=" << code << " msg=" << msg;
                 return;
             }
-            rowsets.emplace_back(std::move(key), std::move(val));
+            last_pending_txn_id = 0;
+            continue;
+        }
 
-            // Accumulate affected rows
-            auto& stats = tablet_stats[tablet_id];
-            stats.data_size += i.total_disk_size();
-            stats.num_rows += i.num_rows();
-            ++stats.num_rowsets;
-            stats.num_segs += i.num_segments();
-            stats.index_size += i.index_disk_size();
-            stats.segment_size += i.data_disk_size();
-        } // for tmp_rowsets_meta
-    }
+        std::vector<std::pair<std::string, std::string>> rowsets;
+        std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
+        for (const auto& sub_txn_info : sub_txn_infos) {
+            auto sub_txn_id = sub_txn_info.sub_txn_id();
+            auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
+            std::unordered_map<int64_t, int64_t> partition_id_to_version;
+            for (auto& [_, i] : tmp_rowsets_meta) {
+                int64_t tablet_id = i.tablet_id();
+                int64_t table_id = tablet_ids[tablet_id].table_id();
+                int64_t partition_id = i.partition_id();
+                std::string ver_key =
+                        partition_version_key({instance_id, db_id, table_id, 
partition_id});
+                if (new_versions.count(ver_key) == 0) [[unlikely]] {
+                    // it is impossible.
+                    code = MetaServiceCode::UNDEFINED_ERR;
+                    ss << "failed to get partition version key, the target 
version not exists in "
+                          "new_versions."
+                       << " txn_id=" << txn_id << ", db_id=" << db_id << ", 
table_id=" << table_id
+                       << ", partition_id=" << partition_id;
+                    msg = ss.str();
+                    LOG(ERROR) << msg;
+                    return;
+                }
 
-    // Save rowset meta
-    for (auto& i : rowsets) {
-        size_t rowset_size = i.first.size() + i.second.size();
-        txn->put(i.first, i.second);
-        LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << 
txn_id
-                  << " rowset_size=" << rowset_size;
-    }
+                // Update rowset version
+                int64_t new_version = new_versions[ver_key];
+                if (partition_id_to_version.count(partition_id) == 0) {
+                    new_versions[ver_key] = new_version + 1;
+                    new_version = new_versions[ver_key];
+                    partition_id_to_version[partition_id] = new_version;
+                }
+                i.set_start_version(new_version);
+                i.set_end_version(new_version);
+                LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
+                          << ", sub_txn_id=" << sub_txn_id << ", table_id=" << 
table_id
+                          << ", partition_id=" << partition_id << ", 
tablet_id=" << tablet_id
+                          << ", new_version=" << new_version;
+
+                std::string key = meta_rowset_key({instance_id, tablet_id, 
i.end_version()});
+                std::string val;
+                if (!i.SerializeToString(&val)) {
+                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                    ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
+                    msg = ss.str();
+                    return;
+                }
+                rowsets.emplace_back(std::move(key), std::move(val));
 
-    // Save versions
-    for (auto& i : new_versions) {
-        std::string ver_val;
-        VersionPB version_pb;
-        version_pb.set_version(i.second);
-        if (!version_pb.SerializeToString(&ver_val)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id;
-            msg = ss.str();
-            return;
+                // Accumulate affected rows
+                auto& stats = tablet_stats[tablet_id];
+                stats.data_size += i.total_disk_size();
+                stats.num_rows += i.num_rows();
+                ++stats.num_rowsets;
+                stats.num_segs += i.num_segments();
+                stats.index_size += i.index_disk_size();
+                stats.segment_size += i.data_disk_size();
+            } // for tmp_rowsets_meta
         }
 
-        txn->put(i.first, ver_val);
-        LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " 
version:" << i.second
-                  << " txn_id=" << txn_id;
-
-        std::string_view ver_key = i.first;
-        ver_key.remove_prefix(1); // Remove key space
-        // PartitionVersionKeyInfo  {instance_id, db_id, table_id, 
partition_id}
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        int ret = decode_key(&ver_key, &out);
-        if (ret != 0) [[unlikely]] {
-            // decode version key error means this is something wrong,
-            // we can not continue this txn
-            LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << 
hex(ver_key);
-            code = MetaServiceCode::UNDEFINED_ERR;
-            msg = "decode version key error";
-            return;
+        // Save rowset meta
+        for (auto& i : rowsets) {
+            size_t rowset_size = i.first.size() + i.second.size();
+            txn->put(i.first, i.second);
+            LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" 
<< txn_id
+                      << " rowset_size=" << rowset_size;
         }
 
-        int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
-        int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
-        VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
-                   << " partition_id=" << partition_id << " version=" << 
i.second;
+        // Save versions
+        for (auto& i : new_versions) {
+            std::string ver_val;
+            VersionPB version_pb;
+            version_pb.set_version(i.second);
+            if (!version_pb.SerializeToString(&ver_val)) {
+                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id;
+                msg = ss.str();
+                return;
+            }
 
-        response->add_table_ids(table_id);
-        response->add_partition_ids(partition_id);
-        response->add_versions(i.second);
-    }
+            txn->put(i.first, ver_val);
+            LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " 
version:" << i.second
+                      << " txn_id=" << txn_id;
 
-    // Save table versions
-    for (auto& i : table_id_tablet_ids) {
-        std::string ver_key = table_version_key({instance_id, db_id, i.first});
-        txn->atomic_add(ver_key, 1);
-        LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " 
txn_id=" << txn_id;
-    }
+            std::string_view ver_key = i.first;
+            ver_key.remove_prefix(1); // Remove key space
+            // PartitionVersionKeyInfo  {instance_id, db_id, table_id, 
partition_id}
+            std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>> out;
+            int ret = decode_key(&ver_key, &out);
+            if (ret != 0) [[unlikely]] {
+                // decode version key error means this is something wrong,
+                // we can not continue this txn
+                LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" 
<< hex(ver_key);
+                code = MetaServiceCode::UNDEFINED_ERR;
+                msg = "decode version key error";
+                return;
+            }
 
-    LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+            int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
+            int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
+            VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
+                       << " partition_id=" << partition_id << " version=" << 
i.second;
 
-    // Update txn_info
-    txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+            response->add_table_ids(table_id);
+            response->add_partition_ids(partition_id);
+            response->add_versions(i.second);
+        }
 
-    auto now_time = system_clock::now();
-    uint64_t commit_time = 
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
-    if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
-        code = MetaServiceCode::UNDEFINED_ERR;
-        msg = fmt::format("txn is expired, not allow to commit txn_id={}", 
txn_id);
-        LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
-                  << " timeout_ms=" << txn_info.timeout_ms() << " 
commit_time=" << commit_time;
-        return;
-    }
-    txn_info.set_commit_time(commit_time);
-    txn_info.set_finish_time(commit_time);
-    if (request->has_commit_attachment()) {
-        
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
-    }
-    LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
-    info_val.clear();
-    if (!txn_info.SerializeToString(&info_val)) {
-        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
-        msg = ss.str();
-        return;
-    }
-    txn->put(info_key, info_val);
-    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
+        // Save table versions
+        for (auto& i : table_id_tablet_ids) {
+            std::string ver_key = table_version_key({instance_id, db_id, 
i.first});
+            txn->atomic_add(ver_key, 1);
+            LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key)
+                      << " txn_id=" << txn_id;
+        }
 
-    // Update stats of affected tablet
-    std::deque<std::string> kv_pool;
-    std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> 
update_tablet_stats;
-    if (config::split_tablet_stats) {
-        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
-            if (stats.num_segs > 0) {
-                auto& data_size_key = kv_pool.emplace_back();
-                stats_tablet_data_size_key(info, &data_size_key);
-                txn->atomic_add(data_size_key, stats.data_size);
-                auto& num_rows_key = kv_pool.emplace_back();
-                stats_tablet_num_rows_key(info, &num_rows_key);
-                txn->atomic_add(num_rows_key, stats.num_rows);
-                auto& num_segs_key = kv_pool.emplace_back();
-                stats_tablet_num_segs_key(info, &num_segs_key);
-                txn->atomic_add(num_segs_key, stats.num_segs);
-                auto& index_size_key = kv_pool.emplace_back();
-                stats_tablet_index_size_key(info, &index_size_key);
-                txn->atomic_add(index_size_key, stats.index_size);
-                auto& segment_size_key = kv_pool.emplace_back();
-                stats_tablet_segment_size_key(info, &segment_size_key);
-                txn->atomic_add(segment_size_key, stats.segment_size);
-            }
-            auto& num_rowsets_key = kv_pool.emplace_back();
-            stats_tablet_num_rowsets_key(info, &num_rowsets_key);
-            txn->atomic_add(num_rowsets_key, stats.num_rowsets);
-        };
-    } else {
-        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
-            auto& key = kv_pool.emplace_back();
-            stats_tablet_key(info, &key);
-            auto& val = kv_pool.emplace_back();
-            TxnErrorCode err = txn->get(key, &val);
-            if (err != TxnErrorCode::TXN_OK) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
-                                  std::get<4>(info));
-                return;
-            }
-            TabletStatsPB stats_pb;
-            if (!stats_pb.ParseFromString(val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = fmt::format("malformed tablet stats value, key={}", 
hex(key));
-                return;
+        LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+
+        // Update txn_info
+        txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+
+        auto now_time = system_clock::now();
+        uint64_t commit_time = 
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
+        if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
+            code = MetaServiceCode::UNDEFINED_ERR;
+            msg = fmt::format("txn is expired, not allow to commit txn_id={}", 
txn_id);
+            LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
+                      << " timeout_ms=" << txn_info.timeout_ms() << " 
commit_time=" << commit_time;
+            return;
+        }
+        txn_info.set_commit_time(commit_time);
+        txn_info.set_finish_time(commit_time);
+        if (request->has_commit_attachment()) {
+            
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
+        }
+        LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
+        info_val.clear();
+        if (!txn_info.SerializeToString(&info_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize txn_info when saving, txn_id=" << 
txn_id;
+            msg = ss.str();
+            return;
+        }
+        txn->put(info_key, info_val);
+        LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << 
txn_id;
+
+        // Update stats of affected tablet
+        std::deque<std::string> kv_pool;
+        std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> 
update_tablet_stats;
+        if (config::split_tablet_stats) {
+            update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+                if (stats.num_segs > 0) {
+                    auto& data_size_key = kv_pool.emplace_back();
+                    stats_tablet_data_size_key(info, &data_size_key);
+                    txn->atomic_add(data_size_key, stats.data_size);
+                    auto& num_rows_key = kv_pool.emplace_back();
+                    stats_tablet_num_rows_key(info, &num_rows_key);
+                    txn->atomic_add(num_rows_key, stats.num_rows);
+                    auto& num_segs_key = kv_pool.emplace_back();
+                    stats_tablet_num_segs_key(info, &num_segs_key);
+                    txn->atomic_add(num_segs_key, stats.num_segs);
+                    auto& index_size_key = kv_pool.emplace_back();
+                    stats_tablet_index_size_key(info, &index_size_key);
+                    txn->atomic_add(index_size_key, stats.index_size);
+                    auto& segment_size_key = kv_pool.emplace_back();
+                    stats_tablet_segment_size_key(info, &segment_size_key);
+                    txn->atomic_add(segment_size_key, stats.segment_size);
+                }
+                auto& num_rowsets_key = kv_pool.emplace_back();
+                stats_tablet_num_rowsets_key(info, &num_rowsets_key);
+                txn->atomic_add(num_rowsets_key, stats.num_rowsets);
+            };
+        } else {
+            update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+                auto& key = kv_pool.emplace_back();
+                stats_tablet_key(info, &key);
+                auto& val = kv_pool.emplace_back();
+                TxnErrorCode err = txn->get(key, &val);
+                if (err != TxnErrorCode::TXN_OK) {
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TABLET_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
+                                      std::get<4>(info));
+                    return;
+                }
+                TabletStatsPB stats_pb;
+                if (!stats_pb.ParseFromString(val)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    msg = fmt::format("malformed tablet stats value, key={}", 
hex(key));
+                    return;
+                }
+                stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
+                stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
+                stats_pb.set_num_rowsets(stats_pb.num_rowsets() + 
stats.num_rowsets);
+                stats_pb.set_num_segments(stats_pb.num_segments() + 
stats.num_segs);
+                stats_pb.set_index_size(stats_pb.index_size() + 
stats.index_size);
+                stats_pb.set_segment_size(stats_pb.segment_size() + 
stats.segment_size);
+                stats_pb.SerializeToString(&val);
+                txn->put(key, val);
+                LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
+            };
+        }
+        for (auto& [tablet_id, stats] : tablet_stats) {
+            DCHECK(tablet_ids.count(tablet_id));
+            auto& tablet_idx = tablet_ids[tablet_id];
+            StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                     tablet_idx.partition_id(), tablet_id};
+            update_tablet_stats(info, stats);
+            if (code != MetaServiceCode::OK) return;
+        }
+        // Remove tmp rowset meta
+        for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+            for (auto& [k, _] : tmp_rowsets_meta) {
+                txn->remove(k);
+                LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " 
txn_id=" << txn_id;
             }
-            stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
-            stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
-            stats_pb.set_num_rowsets(stats_pb.num_rowsets() + 
stats.num_rowsets);
-            stats_pb.set_num_segments(stats_pb.num_segments() + 
stats.num_segs);
-            stats_pb.set_index_size(stats_pb.index_size() + stats.index_size);
-            stats_pb.set_segment_size(stats_pb.segment_size() + 
stats.segment_size);
-            stats_pb.SerializeToString(&val);
-            txn->put(key, val);
-            LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
-        };
-    }
-    for (auto& [tablet_id, stats] : tablet_stats) {
-        DCHECK(tablet_ids.count(tablet_id));
-        auto& tablet_idx = tablet_ids[tablet_id];
-        StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
-                                 tablet_idx.partition_id(), tablet_id};
-        update_tablet_stats(info, stats);
-        if (code != MetaServiceCode::OK) return;
-    }
-    // Remove tmp rowset meta
-    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
-        for (auto& [k, _] : tmp_rowsets_meta) {
-            txn->remove(k);
-            LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" 
<< txn_id;
         }
-    }
 
-    const std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
-    LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" 
<< txn_id;
-    txn->remove(running_key);
+        const std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
+        LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " 
txn_id=" << txn_id;
+        txn->remove(running_key);
 
-    std::string recycle_val;
-    std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
-    RecycleTxnPB recycle_pb;
-    recycle_pb.set_creation_time(commit_time);
-    recycle_pb.set_label(txn_info.label());
+        std::string recycle_val;
+        std::string recycle_key = recycle_txn_key({instance_id, db_id, 
txn_id});
+        RecycleTxnPB recycle_pb;
+        recycle_pb.set_creation_time(commit_time);
+        recycle_pb.set_label(txn_info.label());
 
-    if (!recycle_pb.SerializeToString(&recycle_val)) {
-        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-        ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
-        msg = ss.str();
-        return;
-    }
-    txn->put(recycle_key, recycle_val);
-    LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key)
-              << " txn_id=" << txn_id;
+        if (!recycle_pb.SerializeToString(&recycle_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
+            msg = ss.str();
+            return;
+        }
+        txn->put(recycle_key, recycle_val);
+        LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << 
hex(recycle_key)
+                  << " txn_id=" << txn_id;
 
-    LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
-              << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" 
<< txn->num_del_keys()
-              << " txn_size=" << txn->approximate_bytes() << " txn_id=" << 
txn_id;
+        LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
+                  << " del_size=" << txn->delete_bytes() << " num_put_keys=" 
<< txn->num_put_keys()
+                  << " num_del_keys=" << txn->num_del_keys()
+                  << " txn_size=" << txn->approximate_bytes() << " txn_id=" << 
txn_id;
 
-    // Finally we are done...
-    err = txn->commit();
-    if (err != TxnErrorCode::TXN_OK) {
-        if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE || err == 
TxnErrorCode::TXN_BYTES_TOO_LARGE) {
-            size_t max_size = 0, max_num_segments = 0,
-                   min_num_segments = std::numeric_limits<size_t>::max(), 
avg_num_segments = 0;
-            std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta = 
nullptr;
-            for (auto& sub_txn : sub_txn_infos) {
-                auto it = 
sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id());
-                if (it == sub_txn_to_tmp_rowsets_meta.end()) {
-                    continue;
-                }
-                for (auto& rowset_meta : it->second) {
-                    if (rowset_meta.second.ByteSizeLong() > max_size) {
-                        max_size = rowset_meta.second.ByteSizeLong();
-                        max_rowset_meta = &rowset_meta;
+        
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_with_sub_txn::before_commit", 
&err, &code);
+        err = txn->commit();
+        if (err != TxnErrorCode::TXN_OK) {
+            if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE ||
+                err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+                size_t max_size = 0, max_num_segments = 0,
+                       min_num_segments = std::numeric_limits<size_t>::max(), 
avg_num_segments = 0;
+                std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta = 
nullptr;
+                for (auto& sub_txn : sub_txn_infos) {
+                    auto it = 
sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id());
+                    if (it == sub_txn_to_tmp_rowsets_meta.end()) {
+                        continue;
                     }
-                    if (rowset_meta.second.num_segments() > max_num_segments) {
-                        max_num_segments = rowset_meta.second.num_segments();
+                    for (auto& rowset_meta : it->second) {
+                        if (rowset_meta.second.ByteSizeLong() > max_size) {
+                            max_size = rowset_meta.second.ByteSizeLong();
+                            max_rowset_meta = &rowset_meta;
+                        }
+                        if (rowset_meta.second.num_segments() > 
max_num_segments) {
+                            max_num_segments = 
rowset_meta.second.num_segments();
+                        }
+                        if (rowset_meta.second.num_segments() < 
min_num_segments) {
+                            min_num_segments = 
rowset_meta.second.num_segments();
+                        }
+                        avg_num_segments += rowset_meta.second.num_segments();
                     }
-                    if (rowset_meta.second.num_segments() < min_num_segments) {
-                        min_num_segments = rowset_meta.second.num_segments();
+                    if (!it->second.empty()) {
+                        avg_num_segments /= it->second.size();
                     }
-                    avg_num_segments += rowset_meta.second.num_segments();
                 }
-                if (!it->second.empty()) {
-                    avg_num_segments /= it->second.size();
+                if (max_rowset_meta) {
+                    LOG(WARNING) << "failed to commit kv txn with sub txn"
+                                 << ", err=" << err << ", txn_id=" << txn_id
+                                 << ", total_rowsets=" << rowsets.size()
+                                 << ", avg_num_segments=" << avg_num_segments
+                                 << ", min_num_segments=" << min_num_segments
+                                 << ", max_num_segments=" << max_num_segments
+                                 << ", largest_rowset_size=" << max_size
+                                 << ", largest_rowset_key=" << 
hex(max_rowset_meta->first)
+                                 << ", largest_rowset_value="
+                                 << max_rowset_meta->second.ShortDebugString();
                 }
             }
-            if (max_rowset_meta) {
-                LOG(WARNING) << "failed to commit kv txn with sub txn"
-                             << ", err=" << err << ", txn_id=" << txn_id
-                             << ", total_rowsets=" << rowsets.size()
-                             << ", avg_num_segments=" << avg_num_segments
-                             << ", min_num_segments=" << min_num_segments
-                             << ", max_num_segments=" << max_num_segments
-                             << ", largest_rowset_size=" << max_size
-                             << ", largest_rowset_key=" << 
hex(max_rowset_meta->first)
-                             << ", largest_rowset_value="
-                             << max_rowset_meta->second.ShortDebugString();
-            }
+            code = cast_as<ErrCategory::COMMIT>(err);
+            ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << 
" err=" << err;
+            msg = ss.str();
+            return;
         }
-        code = cast_as<ErrCategory::COMMIT>(err);
-        ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << " 
err=" << err;
-        msg = ss.str();
-        return;
-    }
 
-    // calculate table stats from tablets stats
-    std::map<int64_t /*table_id*/, TableStats> table_stats;
-    std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
-                                         request->base_tablet_ids().end());
-    calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
-    for (const auto& pair : table_stats) {
-        TableStatsPB* stats_pb = response->add_table_stats();
-        auto table_id = pair.first;
-        auto stats = pair.second;
-        get_pb_from_tablestats(stats, stats_pb);
-        stats_pb->set_table_id(table_id);
-        VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
-                   << " table_id=" << table_id
-                   << " updated_row_count=" << stats_pb->updated_row_count();
-    }
+        // calculate table stats from tablets stats
+        std::map<int64_t /*table_id*/, TableStats> table_stats;
+        std::vector<int64_t> 
base_tablet_ids(request->base_tablet_ids().begin(),
+                                             request->base_tablet_ids().end());
+        calc_table_stats(tablet_ids, tablet_stats, table_stats, 
base_tablet_ids);
+        for (const auto& pair : table_stats) {
+            TableStatsPB* stats_pb = response->add_table_stats();
+            auto table_id = pair.first;
+            auto stats = pair.second;
+            get_pb_from_tablestats(stats, stats_pb);
+            stats_pb->set_table_id(table_id);
+            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << 
txn_id
+                       << " table_id=" << table_id
+                       << " updated_row_count=" << 
stats_pb->updated_row_count();
+        }
 
-    response->mutable_txn_info()->CopyFrom(txn_info);
+        response->mutable_txn_info()->CopyFrom(txn_info);
+        TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::finish", &code);
+        break;
+    } while (true);
 } // end commit_txn_with_sub_txn
 
 static bool force_txn_lazy_commit() {
@@ -2614,7 +2662,8 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     RPC_RATE_LIMIT(commit_txn)
 
     if (request->has_is_txn_load() && request->is_txn_load()) {
-        commit_txn_with_sub_txn(request, response, txn_kv_, code, msg, 
instance_id, stats);
+        commit_txn_with_sub_txn(request, response, txn_kv_, 
txn_lazy_committer_, code, msg,
+                                instance_id, stats);
         return;
     }
 
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index c4e67b2ef01..795f2f21cb7 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -451,6 +451,7 @@ void TxnLazyCommitTask::commit() {
                                   << " txn_id=" << txn_id_;
                     }
 
+                    TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
                     err = txn->commit();
                     if (err != TxnErrorCode::TXN_OK) {
                         code_ = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 2e018839e4e..62f959167e6 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -1825,6 +1825,285 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase4Test) {
     ASSERT_EQ(txn_id, txn_info_pb.txn_id());
 }
 
+TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //           thread1                           thread2
+    //              |                                 |
+    //   commit_txn_eventually begin      commit_txn_with_sub_txn begin
+    //              |                                 |
+    //       lazy commit wait                         |
+    //              |                                 |
+    //              |                         advance last txn
+    //              |                                 |
+    //              |                               finish
+    //              |                                 |
+    //           finish                               |
+    //              |                                 |
+    //              |                                 |
+    //              v                                 v
+
+    auto txn_kv = get_mem_txn_kv();
+    int64_t db_id = 134179142;
+    int64_t table_id = 3243264;
+    int64_t index_id = 8098394;
+    int64_t partition_id = 32895361;
+
+    std::mutex go_mutex;
+    std::condition_variable go_cv;
+    bool go = false;
+
+    std::atomic<int32_t> commit_txn_immediately_begin_count = {0};
+    std::atomic<int32_t> last_pending_txn_id_count = {0};
+    std::atomic<int32_t> txn_lazy_committer_wait_count = {0};
+    std::atomic<int32_t> immediately_finish_count = {0};
+    std::atomic<int32_t> eventually_finish_count = {0};
+
+    auto sp = SyncPoint::get_instance();
+
+    int64_t first_txn_id = 0;
+    sp->set_call_back("commit_txn_with_sub_txn:begin", [&](auto&& args) {
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        commit_txn_immediately_begin_count++;
+        if (commit_txn_immediately_begin_count == 1) {
+            {
+                first_txn_id = *try_any_cast<int64_t*>(args[0]);
+                go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count 
== 1; });
+                go_cv.notify_all();
+            }
+        }
+    });
+
+    int64_t second_txn_id = 0;
+    sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", 
[&](auto&& args) {
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        txn_lazy_committer_wait_count++;
+        if (txn_lazy_committer_wait_count == 1) {
+            int64_t txn_id = *try_any_cast<int64_t*>(args[0]);
+            second_txn_id = txn_id;
+            go_cv.notify_all();
+        }
+    });
+
+    sp->set_call_back("commit_txn_with_sub_txn::advance_last_pending_txn_id", 
[&](auto&& args) {
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        last_pending_txn_id_count++;
+        if (last_pending_txn_id_count == 1) {
+            int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+            ASSERT_EQ(last_pending_txn_id, second_txn_id);
+        }
+        go_cv.notify_all();
+    });
+
+    sp->set_call_back("commit_txn_with_sub_txn::finish", [&](auto&& args) {
+        MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        immediately_finish_count++;
+        if (immediately_finish_count == 1) {
+            go_cv.notify_all();
+        }
+    });
+
+    sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+        MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        eventually_finish_count++;
+    });
+
+    sp->set_call_back("TxnLazyCommitter::commit", [&](auto&& args) {
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        go_cv.wait(_lock, [&] { return last_pending_txn_id_count == 1; });
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 1908562;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+
+    int64_t txn_id1 = 0;
+    std::thread thread1([&] {
+        {
+            std::unique_lock<std::mutex> _lock(go_mutex);
+            go_cv.wait(_lock, [&] { return go; });
+        }
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            
txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3442");
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id1 = res.txn_id();
+            ASSERT_GT(txn_id1, 0);
+        }
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset =
+                        create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id1);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    });
+
+    int64_t txn_id2 = 0;
+    std::thread thread2([&] {
+        {
+            std::unique_lock<std::mutex> _lock(go_mutex);
+            go_cv.wait(_lock, [&] { return go; });
+        }
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            
txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually5");
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id2 = res.txn_id();
+            ASSERT_GT(txn_id2, 0);
+        }
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset =
+                        create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+        int64_t sub_txn_id1 = txn_id2;
+
+        // begin sub_txn1
+        int64_t sub_txn_id2 = -1;
+        {
+            brpc::Controller cntl;
+            BeginSubTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id2);
+            req.set_sub_txn_num(0);
+            req.set_db_id(db_id);
+            req.set_label("test_label_concurrent_commit_txn_eventually5_sub");
+            req.mutable_table_ids()->Add(table_id);
+            req.mutable_table_ids()->Add(table_id);
+            BeginSubTxnResponse res;
+            
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+            ASSERT_TRUE(res.has_sub_txn_id());
+            sub_txn_id2 = res.sub_txn_id();
+            ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+        }
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset =
+                        create_rowset(sub_txn_id2, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id2);
+            req.set_is_txn_load(true);
+
+            SubTxnInfo sub_txn_info1;
+            sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+            sub_txn_info1.set_table_id(table_id);
+            for (int i = 0; i < 10; ++i) {
+                sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + 
i);
+            }
+
+            SubTxnInfo sub_txn_info2;
+            sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+            sub_txn_info2.set_table_id(table_id);
+            for (int i = 0; i < 10; ++i) {
+                sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + 
i);
+            }
+
+            req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+            req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    });
+
+    std::unique_lock<std::mutex> go_lock(go_mutex);
+    go = true;
+    go_lock.unlock();
+    go_cv.notify_all();
+
+    thread1.join();
+    thread2.join();
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+    ASSERT_EQ(commit_txn_immediately_begin_count, 2);
+    ASSERT_EQ(last_pending_txn_id_count, 1);
+    ASSERT_EQ(immediately_finish_count, 1);
+    ASSERT_EQ(eventually_finish_count, 1);
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 10; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+
+            check_tmp_rowset_not_exist(txn, tablet_id, first_txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+
+            check_tmp_rowset_not_exist(txn, tablet_id, second_txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 4);
+        }
+    }
+}
+
 TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
     auto txn_kv = get_mem_txn_kv();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to