gavinchou commented on code in PR #34721:
URL: https://github.com/apache/doris/pull/34721#discussion_r1618605707


##########
gensrc/proto/cloud.proto:
##########
@@ -378,6 +378,8 @@ message TxnInfoPB {
     optional TxnStatusPB status = 15;
     optional TxnCommitAttachmentPB commit_attachment = 16;
     optional int64 listener_id = 17; //callback id
+    // for transaction load, used for recycler
+    repeated int64 sub_txn_ids = 18;

Review Comment:
   >>> 100 * 2 ** 10 / 8
   12800
   
   100KB limits there are 12800 subtxn at most



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1612,6 +2190,307 @@ void 
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
     response->set_current_max_txn_id(current_max_txn_id);
 }
 
+/**
+ * 1. Generate a sub_txn_id
+ *
+ * The following steps are done in a txn:
+ * 2. Put txn_index_key in sub_txn_id
+ * 3. Delete txn_label_key in sub_txn_id
+ * 4. Modify the txn state of the txn_id:
+ *    - Add the sub txn id to sub_txn_ids: recycler use it to recycle the 
txn_index_key
+ *    - Add the table id to table_ids
+ */
+void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const BeginSubTxnRequest* request,
+                                    BeginSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(begin_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    std::string label = request->has_label() ? request->label() : "";
+    if (label.empty() || db_id < 0 || table_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, label=" << label << " db_id=" << db_id
+           << ", table_id=" << table_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(begin_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    const std::string label_key = txn_label_key({instance_id, db_id, label});
+    std::string label_val;
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get failed(), err=" << err << " label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // err == OK means this is a retry rpc?
+    if (err == TxnErrorCode::TXN_OK) {
+        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
+    }
+
+    // ret > 0, means label not exist previously.
+    txn->atomic_set_ver_value(label_key, label_val);
+    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "txn->commit failed(), label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    // 2. Get sub txn id from version stamp
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn when get txn id, label=" << label << " 
err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    label_val.clear();
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get() failed, label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // Generated by TxnKv system
+    int64_t sub_txn_id = 0;
+    int ret =
+            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
+                                           label_val.size() - 
VERSION_STAMP_LEN, label_val.size()),
+                                   &sub_txn_id);
+    if (ret != 0) {
+        code = MetaServiceCode::TXN_GEN_ID_ERR;
+        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << 
ret;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" 
<< sub_txn_id
+              << " txn_id=" << txn_id << " label_val.size()=" << 
label_val.size();
+
+    // write txn_index_key
+    const std::string index_key = txn_index_key({instance_id, sub_txn_id});
+    std::string index_val;
+    TxnIndexPB index_pb;
+    if (!index_pb.SerializeToString(&index_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_index_pb "
+           << "label=" << label << " txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    txn_info.mutable_table_ids()->Add(table_id);
+    txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->remove(label_key);
+    txn->put(info_key, info_val);
+    txn->put(index_key, index_val);
+    LOG(INFO) << "xxx remove label_key=" << hex(label_key) << " txn_id=" << 
txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+    response->set_sub_txn_id(sub_txn_id);
+    response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
+/**
+ * 1. Modify the txn state of the txn_id:
+ *    - Remove the table id from table_ids
+ */
+void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const AbortSubTxnRequest* request,
+                                    AbortSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(abort_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : 
-1;
+    if (db_id < 0 || table_id < 0 || sub_txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, db_id=" << db_id << ", table_id=" << table_id
+           << ", sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(abort_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
+           << " sub_txn_id=" << sub_txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // remove table_id and does not need to remove sub_txn_id
+    auto it = txn_info.mutable_table_ids()->end() - 1;

Review Comment:
   need idenpotence



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1235,161 +1239,735 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     response->mutable_txn_info()->CopyFrom(txn_info);
 } // end commit_txn
 
-void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
-                                const AbortTxnRequest* request, 
AbortTxnResponse* response,
-                                ::google::protobuf::Closure* done) {
-    RPC_PREPROCESS(abort_txn);
-    // Get txn id
-    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
-    std::string label = request->has_label() ? request->label() : "";
-    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
-    if (txn_id < 0 && (label.empty() || db_id < 0)) {
+/**
+ * This process is generally the same as commit_txn, the difference is that
+ * the partitions version will plus 1 in multi sub txns.
+ *
+ * One example:
+ *  Suppose the table, partition, tablet and version info is:
+ *  --------------------------------------------
+ *  | table | partition | tablet    | version |
+ *  --------------------------------------------
+ *  | t1    | t1_p1     | t1_p1.1   | 1       |
+ *  | t1    | t1_p1     | t1_p1.2   | 1       |
+ *  | t1    | t1_p2     | t1_p2.1   | 2       |
+ *  | t2    | t2_p3     | t2_p3.1   | 3       |
+ *  | t2    | t2_p4     | t2_p4.1   | 4       |
+ *  --------------------------------------------
+ *
+ *  Now we commit a txn with 3 sub txns and the tablets are:
+ *    sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
+ *    sub_txn2: t2_p3.1
+ *    sub_txn3: t1_p1.1, t1_p1.2
+ *  When commit, the partitions version will be:
+ *    sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
+ *    sub_txn2: t2_p3(3 -> 4)
+ *    sub_txn3: t1_p1(2 -> 3)
+ *  After commit, the partitions version will be:
+ *    t1: t1_p1(3), t1_p2(3)
+ *    t2: t2_p3(4), t2_p4(4)
+ */
+void 
MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController* 
controller,
+                                              const CommitTxnRequest* request,
+                                              CommitTxnResponse* response,
+                                              ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(commit_txn);
+    if (!request->has_txn_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << 
txn_id
-           << " label=" << label;
-        msg = ss.str();
+        msg = "invalid argument, missing txn id";
         return;
     }
 
+    int64_t txn_id = request->txn_id();
+    auto sub_txn_infos = request->sub_txn_infos();
+
     std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
-    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
     if (instance_id.empty()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "cannot find instance_id with cloud_unique_id="
-           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " 
label=" << label
-           << " txn_id=" << txn_id;
-        msg = ss.str();
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " 
txn_id=" << txn_id;
         return;
     }
 
-    RPC_RATE_LIMIT(abort_txn);
+    RPC_RATE_LIMIT(commit_txn)
+
+    // Create a readonly txn for scan tmp rowset
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv_->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
         code = cast_as<ErrCategory::CREATE>(err);
-        ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" 
<< label
-           << " err=" << err;
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
-    std::string info_key; // Will be used when saving updated txn
-    std::string info_val; // Will be reused when saving updated txn
-    TxnInfoPB txn_info;
-    //TODO: split with two function.
-    //there two ways to abort txn:
-    //1. abort txn by txn id
-    //2. abort txn by label and db_id
-    if (txn_id > 0) {
-        VLOG_DEBUG << "abort_txn by txn_id";
-        //abort txn by txn id
-        // Get db id with txn id
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        std::string index_key;
-        std::string index_val;
-        //not provide db_id, we need read from disk.
-        if (!request->has_db_id()) {
-            index_key = txn_index_key({instance_id, txn_id});
-            err = txn->get(index_key, &index_val);
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    int64_t db_id = index_pb.tablet_index().db_id();
+
+    // Get temporary rowsets involved in the txn
+    std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
+            sub_txn_to_tmp_rowsets_meta;
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        // This is a range scan
+        MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
+        MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
+        std::string rs_tmp_key0;
+        std::string rs_tmp_key1;
+        meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+        meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+        // Get rowset meta that should be commited
+        //                   tmp_rowset_key -> rowset_meta
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> 
tmp_rowsets_meta;
+
+        int num_rowsets = 0;
+        std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+                (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, 
&sub_txn_id](int*) {
+                    LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
+                              << ", sub_txn_id=" << sub_txn_id << " 
num_rowsets=" << num_rowsets
+                              << " range=[" << hex(rs_tmp_key0) << "," << 
hex(rs_tmp_key1) << ")";
+                });
+
+        std::unique_ptr<RangeGetIterator> it;
+        do {
+            err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+            if (err == TxnErrorCode::TXN_TOO_OLD) {
+                err = txn_kv_->create_txn(&txn);
+                if (err == TxnErrorCode::TXN_OK) {
+                    err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+                }
+            }
             if (err != TxnErrorCode::TXN_OK) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get db id, txn_id=" << txn_id << " err=" << 
err;
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "internal error, failed to get tmp rowset while 
committing, txn_id=" << txn_id
+                   << " err=" << err;
                 msg = ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
 
-            TxnIndexPB index_pb;
-            if (!index_pb.ParseFromString(index_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse txn_index_val"
-                   << " txn_id=" << txn_id;
-                msg = ss.str();
-                return;
+            while (it->has_next()) {
+                auto [k, v] = it->next();
+                LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " 
txn_id=" << txn_id;
+                tmp_rowsets_meta.emplace_back();
+                if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), 
v.size())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "malformed rowset meta, unable to initialize, 
txn_id=" << txn_id
+                       << " key=" << hex(k);
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                // Save keys that will be removed later
+                tmp_rowsets_meta.back().first = std::string(k.data(), 
k.size());
+                ++num_rowsets;
+                if (!it->has_next()) rs_tmp_key0 = k;
             }
-            DCHECK(index_pb.has_tablet_index() == true);
-            DCHECK(index_pb.tablet_index().has_db_id() == true);
-            db_id = index_pb.tablet_index().db_id();
-        } else {
-            db_id = request->db_id();
-        }
+            rs_tmp_key0.push_back('\x00'); // Update to next smallest key for 
iteration
+        } while (it->more());
 
-        // Get txn info with db_id and txn_id
-        info_key = txn_info_key({instance_id, db_id, txn_id});
-        err = txn->get(info_key, &info_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                          : 
cast_as<ErrCategory::READ>(err);
-            ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << 
txn_id << "err=" << err;
-            msg = ss.str();
-            return;
-        }
+        VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
+                   << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
+        sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, 
std::move(tmp_rowsets_meta));
+    }
 
-        if (!txn_info.ParseFromString(info_val)) {
-            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << 
txn_id;
-            msg = ss.str();
-            return;
-        }
+    // Create a read/write txn for guarantee consistency
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        DCHECK(txn_info.txn_id() == txn_id);
+    // Get txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        //check state is valid.
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
-            code = MetaServiceCode::TXN_ALREADY_ABORTED;
-            ss << "transaction is already abort db_id=" << db_id << "txn_id=" 
<< txn_id;
-            msg = ss.str();
-            return;
-        }
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
-            code = MetaServiceCode::TXN_ALREADY_VISIBLE;
-            ss << "transaction is already visible db_id=" << db_id << 
"txn_id=" << txn_id;
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // TODO: do more check like txn state, 2PC etc.
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+        code = MetaServiceCode::TXN_ALREADY_ABORTED;
+        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+        code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+        if (request->has_is_2pc() && request->is_2pc()) {
+            ss << "transaction [" << txn_id << "] is already visible, not 
pre-committed.";
             msg = ss.str();
+            response->mutable_txn_info()->CopyFrom(txn_info);
             return;
         }
-    } else {
-        VLOG_DEBUG << "abort_txn by db_id and txn label";
-        //abort txn by label.
-        std::string label_key = txn_label_key({instance_id, db_id, label});
-        std::string label_val;
-        err = txn->get(label_key, &label_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = cast_as<ErrCategory::READ>(err);
-            ss << "txn->get() failed, label=" << label << " err=" << err;
-            msg = ss.str();
-            return;
+        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        response->mutable_txn_info()->CopyFrom(txn_info);
+        return;
+    }
+
+    LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+
+    // Prepare rowset meta and new_versions
+    // Read tablet indexes in batch.
+    std::map<int64_t, int64_t> tablet_id_to_idx;
+    std::vector<std::string> tablet_idx_keys;
+    std::vector<int64_t> partition_ids;
+    auto idx = 0;
+    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            auto tablet_id = i.tablet_id();
+            if (tablet_id_to_idx.count(tablet_id) == 0) {
+                tablet_id_to_idx.emplace(tablet_id, idx);
+                tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
+                partition_ids.push_back(i.partition_id());
+                idx++;
+            }
         }
+    }
+    std::vector<std::optional<std::string>> tablet_idx_values;
+    err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, 
Transaction::BatchGetOptions(false));
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get tablet table index ids, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
 
-        //label index not exist
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            code = MetaServiceCode::TXN_LABEL_NOT_FOUND;
-            ss << "label not found, db_id=" << db_id << " label=" << label << 
" err=" << err;
+    // tablet_id -> {table/index/partition}_id
+    std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+    // table_id -> tablets_ids
+    std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+    for (auto [tablet_id, i] : tablet_id_to_idx) {
+        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+            // The value must existed
+            code = MetaServiceCode::KV_TXN_GET_ERR;
+            ss << "failed to get tablet table index ids, err=not found"
+               << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
             msg = ss.str();
+            LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
             return;
         }
-
-        TxnLabelPB label_pb;
-        DCHECK(label_val.size() > VERSION_STAMP_LEN);
-        if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - 
VERSION_STAMP_LEN)) {
+        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "txn_label_pb->ParseFromString() failed, label=" << label;
+            ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
             msg = ss.str();
+            LOG(WARNING) << msg;
             return;
         }
+        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+        VLOG_DEBUG << "tablet_id:" << tablet_id
+                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
+    }
 
-        int64_t prepare_txn_id = 0;
-        //found prepare state txn for abort
-        for (auto& cur_txn_id : label_pb.txn_ids()) {
-            std::string cur_info_key = txn_info_key({instance_id, db_id, 
cur_txn_id});
-            std::string cur_info_val;
-            err = txn->get(cur_info_key, &cur_info_val);
-            if (err != TxnErrorCode::TXN_OK) {
-                code = cast_as<ErrCategory::READ>(err);
-                std::stringstream ss;
-                ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " 
err=" << err;
-                msg = ss.str();
-                return;
-            }
+    tablet_idx_keys.clear();
+    tablet_idx_values.clear();
+
+    // {table/partition} -> version
+    std::unordered_map<std::string, uint64_t> new_versions;
+    std::vector<std::string> version_keys;
+    for (auto& [tablet_id, i] : tablet_id_to_idx) {
+        int64_t table_id = tablet_ids[tablet_id].table_id();
+        int64_t partition_id = partition_ids[i];
+        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
+        if (new_versions.count(ver_key) == 0) {
+            new_versions.insert({ver_key, 0});
+            LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << 
" txn_id=" << txn_id
+                      << ", db_id=" << db_id << ", table_id=" << table_id
+                      << ", partition_id=" << partition_id;
+            version_keys.push_back(std::move(ver_key));
+        }
+    }
+    std::vector<std::optional<std::string>> version_values;
+    err = txn->batch_get(&version_values, version_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get partition versions, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_versions = version_keys.size();
+    for (size_t i = 0; i < total_versions; i++) {
+        int64_t version;
+        if (version_values[i].has_value()) {
+            VersionPB version_pb;
+            if (!version_pb.ParseFromString(version_values[i].value())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "failed to parse version pb txn_id=" << txn_id
+                   << " key=" << hex(version_keys[i]);
+                msg = ss.str();
+                return;
+            }
+            version = version_pb.version();
+        } else {
+            version = 1;
+        }
+        new_versions[version_keys[i]] = version;
+        LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
+                  << " version:" << version << " txn_id=" << txn_id;
+    }
+    version_keys.clear();
+    version_values.clear();
+
+    std::vector<std::pair<std::string, std::string>> rowsets;
+    std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
+    for (const auto& sub_txn_info : sub_txn_infos) {

Review Comment:
   Add enough UT to test the rowset meta KV versions continuity



##########
gensrc/proto/cloud.proto:
##########
@@ -646,6 +648,15 @@ message CommitTxnRequest {
     // merge-on-write table ids
     repeated int64 mow_table_ids = 6;
     repeated int64 base_tablet_ids= 7; // all tablet from base tables 
(excluding mv)
+    // for transaction load
+    optional bool is_txn_load = 9;
+    repeated SubTxnInfo sub_txn_infos = 10;
+}
+
+message SubTxnInfo {
+    optional int64 sub_txn_id = 1;
+    optional int64 table_id = 2;
+    repeated int64 base_tablet_ids= 3;

Review Comment:
   Ensure all tablet ids are not identical.



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1235,161 +1239,735 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     response->mutable_txn_info()->CopyFrom(txn_info);
 } // end commit_txn
 
-void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
-                                const AbortTxnRequest* request, 
AbortTxnResponse* response,
-                                ::google::protobuf::Closure* done) {
-    RPC_PREPROCESS(abort_txn);
-    // Get txn id
-    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
-    std::string label = request->has_label() ? request->label() : "";
-    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
-    if (txn_id < 0 && (label.empty() || db_id < 0)) {
+/**
+ * This process is generally the same as commit_txn, the difference is that
+ * the partitions version will plus 1 in multi sub txns.
+ *
+ * One example:
+ *  Suppose the table, partition, tablet and version info is:
+ *  --------------------------------------------
+ *  | table | partition | tablet    | version |
+ *  --------------------------------------------
+ *  | t1    | t1_p1     | t1_p1.1   | 1       |
+ *  | t1    | t1_p1     | t1_p1.2   | 1       |
+ *  | t1    | t1_p2     | t1_p2.1   | 2       |
+ *  | t2    | t2_p3     | t2_p3.1   | 3       |
+ *  | t2    | t2_p4     | t2_p4.1   | 4       |
+ *  --------------------------------------------
+ *
+ *  Now we commit a txn with 3 sub txns and the tablets are:
+ *    sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
+ *    sub_txn2: t2_p3.1
+ *    sub_txn3: t1_p1.1, t1_p1.2
+ *  When commit, the partitions version will be:
+ *    sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
+ *    sub_txn2: t2_p3(3 -> 4)
+ *    sub_txn3: t1_p1(2 -> 3)
+ *  After commit, the partitions version will be:
+ *    t1: t1_p1(3), t1_p2(3)
+ *    t2: t2_p3(4), t2_p4(4)
+ */
+void 
MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController* 
controller,
+                                              const CommitTxnRequest* request,
+                                              CommitTxnResponse* response,
+                                              ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(commit_txn);
+    if (!request->has_txn_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << 
txn_id
-           << " label=" << label;
-        msg = ss.str();
+        msg = "invalid argument, missing txn id";
         return;
     }
 
+    int64_t txn_id = request->txn_id();
+    auto sub_txn_infos = request->sub_txn_infos();
+
     std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
-    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
     if (instance_id.empty()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "cannot find instance_id with cloud_unique_id="
-           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " 
label=" << label
-           << " txn_id=" << txn_id;
-        msg = ss.str();
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " 
txn_id=" << txn_id;
         return;
     }
 
-    RPC_RATE_LIMIT(abort_txn);
+    RPC_RATE_LIMIT(commit_txn)
+
+    // Create a readonly txn for scan tmp rowset
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv_->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
         code = cast_as<ErrCategory::CREATE>(err);
-        ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" 
<< label
-           << " err=" << err;
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
-    std::string info_key; // Will be used when saving updated txn
-    std::string info_val; // Will be reused when saving updated txn
-    TxnInfoPB txn_info;
-    //TODO: split with two function.
-    //there two ways to abort txn:
-    //1. abort txn by txn id
-    //2. abort txn by label and db_id
-    if (txn_id > 0) {
-        VLOG_DEBUG << "abort_txn by txn_id";
-        //abort txn by txn id
-        // Get db id with txn id
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        std::string index_key;
-        std::string index_val;
-        //not provide db_id, we need read from disk.
-        if (!request->has_db_id()) {
-            index_key = txn_index_key({instance_id, txn_id});
-            err = txn->get(index_key, &index_val);
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    int64_t db_id = index_pb.tablet_index().db_id();
+
+    // Get temporary rowsets involved in the txn
+    std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
+            sub_txn_to_tmp_rowsets_meta;
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        // This is a range scan
+        MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
+        MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
+        std::string rs_tmp_key0;
+        std::string rs_tmp_key1;
+        meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+        meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+        // Get rowset meta that should be commited
+        //                   tmp_rowset_key -> rowset_meta
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> 
tmp_rowsets_meta;
+
+        int num_rowsets = 0;
+        std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+                (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, 
&sub_txn_id](int*) {
+                    LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
+                              << ", sub_txn_id=" << sub_txn_id << " 
num_rowsets=" << num_rowsets
+                              << " range=[" << hex(rs_tmp_key0) << "," << 
hex(rs_tmp_key1) << ")";
+                });
+
+        std::unique_ptr<RangeGetIterator> it;
+        do {
+            err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+            if (err == TxnErrorCode::TXN_TOO_OLD) {
+                err = txn_kv_->create_txn(&txn);
+                if (err == TxnErrorCode::TXN_OK) {
+                    err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+                }
+            }
             if (err != TxnErrorCode::TXN_OK) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get db id, txn_id=" << txn_id << " err=" << 
err;
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "internal error, failed to get tmp rowset while 
committing, txn_id=" << txn_id
+                   << " err=" << err;
                 msg = ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
 
-            TxnIndexPB index_pb;
-            if (!index_pb.ParseFromString(index_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse txn_index_val"
-                   << " txn_id=" << txn_id;
-                msg = ss.str();
-                return;
+            while (it->has_next()) {
+                auto [k, v] = it->next();
+                LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " 
txn_id=" << txn_id;
+                tmp_rowsets_meta.emplace_back();
+                if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), 
v.size())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "malformed rowset meta, unable to initialize, 
txn_id=" << txn_id
+                       << " key=" << hex(k);
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                // Save keys that will be removed later
+                tmp_rowsets_meta.back().first = std::string(k.data(), 
k.size());
+                ++num_rowsets;
+                if (!it->has_next()) rs_tmp_key0 = k;
             }
-            DCHECK(index_pb.has_tablet_index() == true);
-            DCHECK(index_pb.tablet_index().has_db_id() == true);
-            db_id = index_pb.tablet_index().db_id();
-        } else {
-            db_id = request->db_id();
-        }
+            rs_tmp_key0.push_back('\x00'); // Update to next smallest key for 
iteration
+        } while (it->more());
 
-        // Get txn info with db_id and txn_id
-        info_key = txn_info_key({instance_id, db_id, txn_id});
-        err = txn->get(info_key, &info_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                          : 
cast_as<ErrCategory::READ>(err);
-            ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << 
txn_id << "err=" << err;
-            msg = ss.str();
-            return;
-        }
+        VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
+                   << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
+        sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, 
std::move(tmp_rowsets_meta));
+    }
 
-        if (!txn_info.ParseFromString(info_val)) {
-            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << 
txn_id;
-            msg = ss.str();
-            return;
-        }
+    // Create a read/write txn for guarantee consistency
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        DCHECK(txn_info.txn_id() == txn_id);
+    // Get txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        //check state is valid.
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
-            code = MetaServiceCode::TXN_ALREADY_ABORTED;
-            ss << "transaction is already abort db_id=" << db_id << "txn_id=" 
<< txn_id;
-            msg = ss.str();
-            return;
-        }
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
-            code = MetaServiceCode::TXN_ALREADY_VISIBLE;
-            ss << "transaction is already visible db_id=" << db_id << 
"txn_id=" << txn_id;
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // TODO: do more check like txn state, 2PC etc.
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+        code = MetaServiceCode::TXN_ALREADY_ABORTED;
+        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+        code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+        if (request->has_is_2pc() && request->is_2pc()) {

Review Comment:
   do not support 2PC



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1612,6 +2190,307 @@ void 
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
     response->set_current_max_txn_id(current_max_txn_id);
 }
 
+/**
+ * 1. Generate a sub_txn_id
+ *
+ * The following steps are done in a txn:
+ * 2. Put txn_index_key in sub_txn_id
+ * 3. Delete txn_label_key in sub_txn_id
+ * 4. Modify the txn state of the txn_id:
+ *    - Add the sub txn id to sub_txn_ids: recycler use it to recycle the 
txn_index_key
+ *    - Add the table id to table_ids
+ */
+void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const BeginSubTxnRequest* request,
+                                    BeginSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(begin_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    std::string label = request->has_label() ? request->label() : "";
+    if (label.empty() || db_id < 0 || table_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, label=" << label << " db_id=" << db_id
+           << ", table_id=" << table_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(begin_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    const std::string label_key = txn_label_key({instance_id, db_id, label});
+    std::string label_val;
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get failed(), err=" << err << " label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // err == OK means this is a retry rpc?
+    if (err == TxnErrorCode::TXN_OK) {
+        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
+    }
+
+    // ret > 0, means label not exist previously.
+    txn->atomic_set_ver_value(label_key, label_val);
+    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "txn->commit failed(), label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    // 2. Get sub txn id from version stamp
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn when get txn id, label=" << label << " 
err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    label_val.clear();
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get() failed, label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // Generated by TxnKv system
+    int64_t sub_txn_id = 0;
+    int ret =
+            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
+                                           label_val.size() - 
VERSION_STAMP_LEN, label_val.size()),
+                                   &sub_txn_id);
+    if (ret != 0) {
+        code = MetaServiceCode::TXN_GEN_ID_ERR;
+        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << 
ret;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" 
<< sub_txn_id
+              << " txn_id=" << txn_id << " label_val.size()=" << 
label_val.size();
+
+    // write txn_index_key
+    const std::string index_key = txn_index_key({instance_id, sub_txn_id});
+    std::string index_val;
+    TxnIndexPB index_pb;
+    if (!index_pb.SerializeToString(&index_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_index_pb "
+           << "label=" << label << " txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    txn_info.mutable_table_ids()->Add(table_id);
+    txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->remove(label_key);
+    txn->put(info_key, info_val);
+    txn->put(index_key, index_val);
+    LOG(INFO) << "xxx remove label_key=" << hex(label_key) << " txn_id=" << 
txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+    response->set_sub_txn_id(sub_txn_id);
+    response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
+/**
+ * 1. Modify the txn state of the txn_id:
+ *    - Remove the table id from table_ids
+ */
+void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const AbortSubTxnRequest* request,
+                                    AbortSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(abort_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : 
-1;
+    if (db_id < 0 || table_id < 0 || sub_txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, db_id=" << db_id << ", table_id=" << table_id
+           << ", sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(abort_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
+           << " sub_txn_id=" << sub_txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // remove table_id and does not need to remove sub_txn_id
+    auto it = txn_info.mutable_table_ids()->end() - 1;
+    if (*it == table_id) {

Review Comment:
   maybe coredump if `it` is `end()`



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1235,161 +1239,735 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     response->mutable_txn_info()->CopyFrom(txn_info);
 } // end commit_txn
 
-void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
-                                const AbortTxnRequest* request, 
AbortTxnResponse* response,
-                                ::google::protobuf::Closure* done) {
-    RPC_PREPROCESS(abort_txn);
-    // Get txn id
-    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
-    std::string label = request->has_label() ? request->label() : "";
-    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
-    if (txn_id < 0 && (label.empty() || db_id < 0)) {
+/**
+ * This process is generally the same as commit_txn, the difference is that
+ * the partitions version will plus 1 in multi sub txns.
+ *
+ * One example:
+ *  Suppose the table, partition, tablet and version info is:
+ *  --------------------------------------------
+ *  | table | partition | tablet    | version |
+ *  --------------------------------------------
+ *  | t1    | t1_p1     | t1_p1.1   | 1       |
+ *  | t1    | t1_p1     | t1_p1.2   | 1       |
+ *  | t1    | t1_p2     | t1_p2.1   | 2       |
+ *  | t2    | t2_p3     | t2_p3.1   | 3       |
+ *  | t2    | t2_p4     | t2_p4.1   | 4       |
+ *  --------------------------------------------
+ *
+ *  Now we commit a txn with 3 sub txns and the tablets are:
+ *    sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
+ *    sub_txn2: t2_p3.1
+ *    sub_txn3: t1_p1.1, t1_p1.2
+ *  When commit, the partitions version will be:
+ *    sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
+ *    sub_txn2: t2_p3(3 -> 4)
+ *    sub_txn3: t1_p1(2 -> 3)
+ *  After commit, the partitions version will be:
+ *    t1: t1_p1(3), t1_p2(3)
+ *    t2: t2_p3(4), t2_p4(4)
+ */
+void 
MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController* 
controller,
+                                              const CommitTxnRequest* request,
+                                              CommitTxnResponse* response,
+                                              ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(commit_txn);
+    if (!request->has_txn_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << 
txn_id
-           << " label=" << label;
-        msg = ss.str();
+        msg = "invalid argument, missing txn id";
         return;
     }
 
+    int64_t txn_id = request->txn_id();
+    auto sub_txn_infos = request->sub_txn_infos();
+
     std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
-    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
     if (instance_id.empty()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "cannot find instance_id with cloud_unique_id="
-           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " 
label=" << label
-           << " txn_id=" << txn_id;
-        msg = ss.str();
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " 
txn_id=" << txn_id;
         return;
     }
 
-    RPC_RATE_LIMIT(abort_txn);
+    RPC_RATE_LIMIT(commit_txn)
+
+    // Create a readonly txn for scan tmp rowset
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv_->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
         code = cast_as<ErrCategory::CREATE>(err);
-        ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" 
<< label
-           << " err=" << err;
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
-    std::string info_key; // Will be used when saving updated txn
-    std::string info_val; // Will be reused when saving updated txn
-    TxnInfoPB txn_info;
-    //TODO: split with two function.
-    //there two ways to abort txn:
-    //1. abort txn by txn id
-    //2. abort txn by label and db_id
-    if (txn_id > 0) {
-        VLOG_DEBUG << "abort_txn by txn_id";
-        //abort txn by txn id
-        // Get db id with txn id
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        std::string index_key;
-        std::string index_val;
-        //not provide db_id, we need read from disk.
-        if (!request->has_db_id()) {
-            index_key = txn_index_key({instance_id, txn_id});
-            err = txn->get(index_key, &index_val);
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    int64_t db_id = index_pb.tablet_index().db_id();
+
+    // Get temporary rowsets involved in the txn
+    std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
+            sub_txn_to_tmp_rowsets_meta;
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        // This is a range scan
+        MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
+        MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
+        std::string rs_tmp_key0;
+        std::string rs_tmp_key1;
+        meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+        meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+        // Get rowset meta that should be commited
+        //                   tmp_rowset_key -> rowset_meta
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> 
tmp_rowsets_meta;
+
+        int num_rowsets = 0;
+        std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+                (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, 
&sub_txn_id](int*) {
+                    LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
+                              << ", sub_txn_id=" << sub_txn_id << " 
num_rowsets=" << num_rowsets
+                              << " range=[" << hex(rs_tmp_key0) << "," << 
hex(rs_tmp_key1) << ")";
+                });
+
+        std::unique_ptr<RangeGetIterator> it;
+        do {
+            err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+            if (err == TxnErrorCode::TXN_TOO_OLD) {
+                err = txn_kv_->create_txn(&txn);
+                if (err == TxnErrorCode::TXN_OK) {
+                    err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+                }
+            }
             if (err != TxnErrorCode::TXN_OK) {
-                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                              : 
cast_as<ErrCategory::READ>(err);
-                ss << "failed to get db id, txn_id=" << txn_id << " err=" << 
err;
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "internal error, failed to get tmp rowset while 
committing, txn_id=" << txn_id
+                   << " err=" << err;
                 msg = ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
 
-            TxnIndexPB index_pb;
-            if (!index_pb.ParseFromString(index_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse txn_index_val"
-                   << " txn_id=" << txn_id;
-                msg = ss.str();
-                return;
+            while (it->has_next()) {
+                auto [k, v] = it->next();
+                LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " 
txn_id=" << txn_id;
+                tmp_rowsets_meta.emplace_back();
+                if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), 
v.size())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "malformed rowset meta, unable to initialize, 
txn_id=" << txn_id
+                       << " key=" << hex(k);
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                // Save keys that will be removed later
+                tmp_rowsets_meta.back().first = std::string(k.data(), 
k.size());
+                ++num_rowsets;
+                if (!it->has_next()) rs_tmp_key0 = k;
             }
-            DCHECK(index_pb.has_tablet_index() == true);
-            DCHECK(index_pb.tablet_index().has_db_id() == true);
-            db_id = index_pb.tablet_index().db_id();
-        } else {
-            db_id = request->db_id();
-        }
+            rs_tmp_key0.push_back('\x00'); // Update to next smallest key for 
iteration
+        } while (it->more());
 
-        // Get txn info with db_id and txn_id
-        info_key = txn_info_key({instance_id, db_id, txn_id});
-        err = txn->get(info_key, &info_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                          : 
cast_as<ErrCategory::READ>(err);
-            ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << 
txn_id << "err=" << err;
-            msg = ss.str();
-            return;
-        }
+        VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
+                   << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
+        sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, 
std::move(tmp_rowsets_meta));
+    }
 
-        if (!txn_info.ParseFromString(info_val)) {
-            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << 
txn_id;
-            msg = ss.str();
-            return;
-        }
+    // Create a read/write txn for guarantee consistency
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        DCHECK(txn_info.txn_id() == txn_id);
+    // Get txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
 
-        //check state is valid.
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
-            code = MetaServiceCode::TXN_ALREADY_ABORTED;
-            ss << "transaction is already abort db_id=" << db_id << "txn_id=" 
<< txn_id;
-            msg = ss.str();
-            return;
-        }
-        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
-            code = MetaServiceCode::TXN_ALREADY_VISIBLE;
-            ss << "transaction is already visible db_id=" << db_id << 
"txn_id=" << txn_id;
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // TODO: do more check like txn state, 2PC etc.
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+        code = MetaServiceCode::TXN_ALREADY_ABORTED;
+        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+        code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+        if (request->has_is_2pc() && request->is_2pc()) {
+            ss << "transaction [" << txn_id << "] is already visible, not 
pre-committed.";
             msg = ss.str();
+            response->mutable_txn_info()->CopyFrom(txn_info);
             return;
         }
-    } else {
-        VLOG_DEBUG << "abort_txn by db_id and txn label";
-        //abort txn by label.
-        std::string label_key = txn_label_key({instance_id, db_id, label});
-        std::string label_val;
-        err = txn->get(label_key, &label_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = cast_as<ErrCategory::READ>(err);
-            ss << "txn->get() failed, label=" << label << " err=" << err;
-            msg = ss.str();
-            return;
+        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        response->mutable_txn_info()->CopyFrom(txn_info);
+        return;
+    }
+
+    LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+
+    // Prepare rowset meta and new_versions
+    // Read tablet indexes in batch.
+    std::map<int64_t, int64_t> tablet_id_to_idx;
+    std::vector<std::string> tablet_idx_keys;
+    std::vector<int64_t> partition_ids;
+    auto idx = 0;
+    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            auto tablet_id = i.tablet_id();
+            if (tablet_id_to_idx.count(tablet_id) == 0) {
+                tablet_id_to_idx.emplace(tablet_id, idx);
+                tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
+                partition_ids.push_back(i.partition_id());
+                idx++;
+            }
         }
+    }
+    std::vector<std::optional<std::string>> tablet_idx_values;
+    err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, 
Transaction::BatchGetOptions(false));
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get tablet table index ids, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
 
-        //label index not exist
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            code = MetaServiceCode::TXN_LABEL_NOT_FOUND;
-            ss << "label not found, db_id=" << db_id << " label=" << label << 
" err=" << err;
+    // tablet_id -> {table/index/partition}_id
+    std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+    // table_id -> tablets_ids
+    std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+    for (auto [tablet_id, i] : tablet_id_to_idx) {
+        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+            // The value must existed
+            code = MetaServiceCode::KV_TXN_GET_ERR;
+            ss << "failed to get tablet table index ids, err=not found"
+               << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
             msg = ss.str();
+            LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
             return;
         }
-
-        TxnLabelPB label_pb;
-        DCHECK(label_val.size() > VERSION_STAMP_LEN);
-        if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - 
VERSION_STAMP_LEN)) {
+        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-            ss << "txn_label_pb->ParseFromString() failed, label=" << label;
+            ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
             msg = ss.str();
+            LOG(WARNING) << msg;
             return;
         }
+        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+        VLOG_DEBUG << "tablet_id:" << tablet_id
+                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
+    }
 
-        int64_t prepare_txn_id = 0;
-        //found prepare state txn for abort
-        for (auto& cur_txn_id : label_pb.txn_ids()) {
-            std::string cur_info_key = txn_info_key({instance_id, db_id, 
cur_txn_id});
-            std::string cur_info_val;
-            err = txn->get(cur_info_key, &cur_info_val);
-            if (err != TxnErrorCode::TXN_OK) {
-                code = cast_as<ErrCategory::READ>(err);
-                std::stringstream ss;
-                ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " 
err=" << err;
-                msg = ss.str();
-                return;
-            }
+    tablet_idx_keys.clear();
+    tablet_idx_values.clear();
+
+    // {table/partition} -> version
+    std::unordered_map<std::string, uint64_t> new_versions;
+    std::vector<std::string> version_keys;
+    for (auto& [tablet_id, i] : tablet_id_to_idx) {
+        int64_t table_id = tablet_ids[tablet_id].table_id();
+        int64_t partition_id = partition_ids[i];
+        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
+        if (new_versions.count(ver_key) == 0) {
+            new_versions.insert({ver_key, 0});
+            LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << 
" txn_id=" << txn_id
+                      << ", db_id=" << db_id << ", table_id=" << table_id
+                      << ", partition_id=" << partition_id;
+            version_keys.push_back(std::move(ver_key));
+        }
+    }
+    std::vector<std::optional<std::string>> version_values;
+    err = txn->batch_get(&version_values, version_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get partition versions, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_versions = version_keys.size();
+    for (size_t i = 0; i < total_versions; i++) {
+        int64_t version;
+        if (version_values[i].has_value()) {
+            VersionPB version_pb;
+            if (!version_pb.ParseFromString(version_values[i].value())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "failed to parse version pb txn_id=" << txn_id
+                   << " key=" << hex(version_keys[i]);
+                msg = ss.str();
+                return;
+            }
+            version = version_pb.version();
+        } else {
+            version = 1;
+        }
+        new_versions[version_keys[i]] = version;
+        LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
+                  << " version:" << version << " txn_id=" << txn_id;
+    }
+    version_keys.clear();
+    version_values.clear();
+
+    std::vector<std::pair<std::string, std::string>> rowsets;
+    std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
+        std::unordered_map<int64_t, int64_t> partition_id_to_version;
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            int64_t tablet_id = i.tablet_id();
+            int64_t table_id = tablet_ids[tablet_id].table_id();
+            int64_t partition_id = i.partition_id();
+            std::string ver_key =
+                    partition_version_key({instance_id, db_id, table_id, 
partition_id});
+            if (new_versions.count(ver_key) == 0) [[unlikely]] {
+                // it is impossible.
+                code = MetaServiceCode::UNDEFINED_ERR;
+                ss << "failed to get partition version key, the target version 
not exists in "
+                      "new_versions."
+                   << " txn_id=" << txn_id << ", db_id=" << db_id << ", 
table_id=" << table_id
+                   << ", partition_id=" << partition_id;
+                msg = ss.str();
+                LOG(ERROR) << msg;
+                return;
+            }
+
+            // Update rowset version
+            int64_t new_version = new_versions[ver_key];
+            if (partition_id_to_version.count(partition_id) == 0) {
+                new_versions[ver_key] = new_version + 1;
+                new_version = new_versions[ver_key];
+                partition_id_to_version[partition_id] = new_version;
+            }
+            i.set_start_version(new_version);
+            i.set_end_version(new_version);
+            LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
+                      << ", sub_txn_id=" << sub_txn_id << ", table_id=" << 
table_id
+                      << ", partition_id=" << partition_id << ", tablet_id=" 
<< tablet_id
+                      << ", new_version=" << new_version;
+
+            std::string key = meta_rowset_key({instance_id, tablet_id, 
i.end_version()});
+            std::string val;
+            if (!i.SerializeToString(&val)) {
+                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
+                msg = ss.str();
+                return;
+            }
+            rowsets.emplace_back(std::move(key), std::move(val));
+
+            // Accumulate affected rows
+            auto& stats = tablet_stats[tablet_id];
+            stats.data_size += i.data_disk_size();
+            stats.num_rows += i.num_rows();
+            ++stats.num_rowsets;
+            stats.num_segs += i.num_segments();
+        } // for tmp_rowsets_meta
+    }
+
+    // Save rowset meta
+    for (auto& i : rowsets) {
+        size_t rowset_size = i.first.size() + i.second.size();
+        txn->put(i.first, i.second);
+        LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << 
txn_id
+                  << " rowset_size=" << rowset_size;
+    }
+
+    // Save versions
+    for (auto& i : new_versions) {
+        std::string ver_val;
+        VersionPB version_pb;
+        version_pb.set_version(i.second);
+        if (!version_pb.SerializeToString(&ver_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id;
+            msg = ss.str();
+            return;
+        }
+
+        txn->put(i.first, ver_val);
+        LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " 
version:" << i.second
+                  << " txn_id=" << txn_id;
+
+        std::string_view ver_key = i.first;
+        ver_key.remove_prefix(1); // Remove key space
+        // PartitionVersionKeyInfo  {instance_id, db_id, table_id, 
partition_id}
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        int ret = decode_key(&ver_key, &out);
+        if (ret != 0) [[unlikely]] {
+            // decode version key error means this is something wrong,
+            // we can not continue this txn
+            LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << 
hex(ver_key);
+            code = MetaServiceCode::UNDEFINED_ERR;
+            msg = "decode version key error";
+            return;
+        }
+
+        int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
+        int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
+        VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
+                   << " partition_id=" << partition_id << " version=" << 
i.second;
+
+        response->add_table_ids(table_id);
+        response->add_partition_ids(partition_id);
+        response->add_versions(i.second);
+    }
+
+    // Save table versions
+    for (auto& i : table_id_tablet_ids) {
+        std::string ver_key = table_version_key({instance_id, db_id, i.first});
+        txn->atomic_add(ver_key, 1);
+        LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " 
txn_id=" << txn_id;
+    }
+
+    LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+
+    // Update txn_info
+    txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+
+    auto now_time = system_clock::now();
+    uint64_t commit_time = 
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
+    if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
+        code = MetaServiceCode::UNDEFINED_ERR;
+        msg = fmt::format("txn is expired, not allow to commit txn_id={}", 
txn_id);
+        LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
+                  << " timeout_ms=" << txn_info.timeout_ms() << " 
commit_time=" << commit_time;
+        return;
+    }
+    txn_info.set_commit_time(commit_time);
+    txn_info.set_finish_time(commit_time);
+    if (request->has_commit_attachment()) {
+        
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
+    }
+    LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
+    info_val.clear();
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+    txn->put(info_key, info_val);
+    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
+
+    // Update stats of affected tablet
+    std::deque<std::string> kv_pool;
+    std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> 
update_tablet_stats;
+    if (config::split_tablet_stats) {
+        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+            if (stats.num_segs > 0) {
+                auto& data_size_key = kv_pool.emplace_back();
+                stats_tablet_data_size_key(info, &data_size_key);
+                txn->atomic_add(data_size_key, stats.data_size);
+                auto& num_rows_key = kv_pool.emplace_back();
+                stats_tablet_num_rows_key(info, &num_rows_key);
+                txn->atomic_add(num_rows_key, stats.num_rows);
+                auto& num_segs_key = kv_pool.emplace_back();
+                stats_tablet_num_segs_key(info, &num_segs_key);
+                txn->atomic_add(num_segs_key, stats.num_segs);
+            }
+            auto& num_rowsets_key = kv_pool.emplace_back();
+            stats_tablet_num_rowsets_key(info, &num_rowsets_key);
+            txn->atomic_add(num_rowsets_key, stats.num_rowsets);
+        };
+    } else {
+        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+            auto& key = kv_pool.emplace_back();
+            stats_tablet_key(info, &key);
+            auto& val = kv_pool.emplace_back();
+            TxnErrorCode err = txn->get(key, &val);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
+                                                              : 
cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
+                                  std::get<4>(info));
+                return;
+            }
+            TabletStatsPB stats_pb;
+            if (!stats_pb.ParseFromString(val)) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                msg = fmt::format("malformed tablet stats value, key={}", 
hex(key));
+                return;
+            }
+            stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
+            stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
+            stats_pb.set_num_rowsets(stats_pb.num_rowsets() + 
stats.num_rowsets);
+            stats_pb.set_num_segments(stats_pb.num_segments() + 
stats.num_segs);
+            stats_pb.SerializeToString(&val);
+            txn->put(key, val);
+        };
+    }
+    for (auto& [tablet_id, stats] : tablet_stats) {
+        DCHECK(tablet_ids.count(tablet_id));
+        auto& tablet_idx = tablet_ids[tablet_id];
+        StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                 tablet_idx.partition_id(), tablet_id};
+        update_tablet_stats(info, stats);
+        if (code != MetaServiceCode::OK) return;
+    }
+    // Remove tmp rowset meta
+    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (auto& [k, _] : tmp_rowsets_meta) {
+            txn->remove(k);
+            LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" 
<< txn_id;
+        }
+    }
+
+    const std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
+    LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" 
<< txn_id;
+    txn->remove(running_key);
+
+    std::string recycle_val;
+    std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
+    RecycleTxnPB recycle_pb;
+    recycle_pb.set_creation_time(commit_time);
+    recycle_pb.set_label(txn_info.label());
+
+    if (!recycle_pb.SerializeToString(&recycle_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+    txn->put(recycle_key, recycle_val);
+
+    LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << 
" txn_id=" << txn_id;
+    LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()

Review Comment:
   one LOG



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -1612,6 +2190,307 @@ void 
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
     response->set_current_max_txn_id(current_max_txn_id);
 }
 
+/**
+ * 1. Generate a sub_txn_id
+ *
+ * The following steps are done in a txn:
+ * 2. Put txn_index_key in sub_txn_id
+ * 3. Delete txn_label_key in sub_txn_id
+ * 4. Modify the txn state of the txn_id:
+ *    - Add the sub txn id to sub_txn_ids: recycler use it to recycle the 
txn_index_key
+ *    - Add the table id to table_ids
+ */
+void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const BeginSubTxnRequest* request,
+                                    BeginSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(begin_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    std::string label = request->has_label() ? request->label() : "";
+    if (label.empty() || db_id < 0 || table_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, label=" << label << " db_id=" << db_id
+           << ", table_id=" << table_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(begin_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    const std::string label_key = txn_label_key({instance_id, db_id, label});
+    std::string label_val;
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get failed(), err=" << err << " label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // err == OK means this is a retry rpc?
+    if (err == TxnErrorCode::TXN_OK) {
+        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
+    }
+
+    // ret > 0, means label not exist previously.
+    txn->atomic_set_ver_value(label_key, label_val);
+    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "txn->commit failed(), label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    // 2. Get sub txn id from version stamp
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn when get txn id, label=" << label << " 
err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    label_val.clear();
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get() failed, label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // Generated by TxnKv system
+    int64_t sub_txn_id = 0;
+    int ret =
+            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
+                                           label_val.size() - 
VERSION_STAMP_LEN, label_val.size()),
+                                   &sub_txn_id);
+    if (ret != 0) {
+        code = MetaServiceCode::TXN_GEN_ID_ERR;
+        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << 
ret;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" 
<< sub_txn_id
+              << " txn_id=" << txn_id << " label_val.size()=" << 
label_val.size();
+
+    // write txn_index_key
+    const std::string index_key = txn_index_key({instance_id, sub_txn_id});
+    std::string index_val;
+    TxnIndexPB index_pb;
+    if (!index_pb.SerializeToString(&index_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_index_pb "
+           << "label=" << label << " txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    txn_info.mutable_table_ids()->Add(table_id);
+    txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->remove(label_key);
+    txn->put(info_key, info_val);
+    txn->put(index_key, index_val);
+    LOG(INFO) << "xxx remove label_key=" << hex(label_key) << " txn_id=" << 
txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id
+              << " sub_txn_id=" << sub_txn_id;
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+    response->set_sub_txn_id(sub_txn_id);
+    response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
+/**
+ * 1. Modify the txn state of the txn_id:
+ *    - Remove the table id from table_ids
+ */
+void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const AbortSubTxnRequest* request,
+                                    AbortSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(abort_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    if (txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid txn_id, it may be not given or set properly, txn_id=" 
<< txn_id;
+        msg = ss.str();
+        return;
+    }
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    int64_t table_id = request->has_table_id() ? request->table_id() : -1;
+    int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : 
-1;
+    if (db_id < 0 || table_id < 0 || sub_txn_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, db_id=" << db_id << ", table_id=" << table_id
+           << ", sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(abort_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
+           << " sub_txn_id=" << sub_txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // remove table_id and does not need to remove sub_txn_id
+    auto it = txn_info.mutable_table_ids()->end() - 1;

Review Comment:
   SC may proceed prematurely, which may end up with incorrect data conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to