This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d45bf9c3c51 branch-3.1: [feat](checker) Add txn key consistency 
checking for checker #54620 (#56574)
d45bf9c3c51 is described below

commit d45bf9c3c5119215cd8d435be65a56f8a0efa3b7
Author: Yixuan Wang <[email protected]>
AuthorDate: Thu Oct 16 10:34:20 2025 +0800

    branch-3.1: [feat](checker) Add txn key consistency checking for checker 
#54620 (#56574)
    
    pick: #54620
---
 cloud/src/common/config.h      |   1 +
 cloud/src/meta-store/keys.cpp  |   2 +-
 cloud/src/recycler/checker.cpp | 261 ++++++++++++++++++++++++++-
 cloud/src/recycler/checker.h   |   9 +
 cloud/test/recycler_test.cpp   | 388 +++++++++++++++++++++++++++++++++++++++++
 5 files changed, 659 insertions(+), 2 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 7a576ab54e9..ff59ef649fc 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -121,6 +121,7 @@ CONF_mBool(enable_mow_job_key_check, "false");
 CONF_mBool(enable_restore_job_check, "false");
 
 CONF_mBool(enable_tablet_stats_key_check, "false");
+CONF_mBool(enable_txn_key_check, "false");
 
 CONF_mBool(enable_checker_for_meta_key_check, "false");
 CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index e2c0d3446ee..56a79249883 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -196,7 +196,7 @@ std::string txn_key_prefix(std::string_view instance_id) {
 
 void txn_label_key(const TxnLabelKeyInfo& in, std::string* out) {
     encode_prefix(in, out);                 // 0x01 "txn" ${instance_id}
-    encode_bytes(TXN_KEY_INFIX_LABEL, out); // "txn_index"
+    encode_bytes(TXN_KEY_INFIX_LABEL, out); // "txn_label"
     encode_int64(std::get<1>(in), out);     // db_id
     encode_bytes(std::get<2>(in), out);     // label
 }
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 91e3dddb26e..1d8716169d9 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -29,6 +29,7 @@
 #include <algorithm>
 #include <chrono>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <numeric>
@@ -72,6 +73,8 @@ extern std::vector<std::string> recycle_blacklist;
 extern bool enable_inverted_check;
 } // namespace config
 
+using namespace std::chrono;
+
 Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
     ip_port_ = std::string(butil::my_ip_cstr()) + ":" + 
std::to_string(config::brpc_listen_port);
 }
@@ -214,6 +217,12 @@ int Checker::start() {
                 }
             }
 
+            if (config::enable_txn_key_check) {
+                if (int ret = checker->do_txn_key_check(); ret != 0) {
+                    success = false;
+                }
+            }
+
             if (config::enable_delete_bitmap_storage_optimize_v2_check) {
                 if (int ret = 
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
                     ret != 0) {
@@ -283,7 +292,6 @@ void Checker::lease_check_jobs() {
         }
     }
 }
-
 #define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
 void Checker::do_inspect(const InstanceInfoPB& instance) {
     std::string check_job_key = job_check_key({instance.instance_id()});
@@ -2168,4 +2176,255 @@ int InstanceChecker::do_restore_job_check() {
     return 0;
 }
 
+int InstanceChecker::check_txn_info_key(std::string_view key, std::string_view 
value) {
+    std::unordered_map<int64_t, std::string> txn_info_;
+    TxnLabelPB txn_label_pb;
+
+    auto handle_check_txn_label_key = [&](std::string_view key, 
std::string_view value) -> int {
+        TxnInfoPB txn_info_pb;
+        std::string_view k1 = key;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+        if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+            LOG(WARNING) << "failed to parse TxnInfoPB";
+            return -1;
+        }
+        auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+        auto it = txn_info_.find(txn_id);
+        if (it == txn_info_.end()) {
+            return 0;
+        } else {
+            if (it->second != txn_info_pb.label()) {
+                LOG(WARNING) << "txn_info_pb's txn_label not same with 
txn_label_pb's txn_label,"
+                             << " txn_info_pb's txn_label: " << 
txn_info_pb.label()
+                             << " txn_label_pb meta: " << 
txn_label_pb.ShortDebugString();
+                return 1;
+            }
+        }
+        return 0;
+    };
+    std::string_view k1 = key;
+    k1.remove_prefix(1);
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    decode_key(&k1, &out);
+    // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
+    if (!txn_label_pb.ParseFromArray(value.data(), value.size() - 
VERSION_STAMP_LEN)) {
+        LOG(WARNING) << "failed to parse TxnLabelPB";
+        return -1;
+    }
+    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+    auto label = std::get<std::string>(std::get<0>(out[4]));
+    // txn_id -> txn_label
+    for (const auto& txn_id : txn_label_pb.txn_ids()) {
+        txn_info_.insert({txn_id, label});
+    }
+    std::string txn_info_key_begin = txn_info_key({instance_id_, db_id, 0});
+    std::string txn_info_key_end = txn_info_key({instance_id_, db_id, 
INT64_MAX});
+    return scan_and_handle_kv(txn_info_key_begin, txn_info_key_end,
+                              [&](std::string_view k, std::string_view v) -> 
int {
+                                  return handle_check_txn_label_key(k, v);
+                              });
+}
+
+int InstanceChecker::check_txn_label_key(std::string_view key, 
std::string_view value) {
+    TxnInfoPB txn_info_pb;
+    std::string_view k1 = key;
+    k1.remove_prefix(1);
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    decode_key(&k1, &out);
+    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+        LOG(WARNING) << "failed to parse TxnInfoPB";
+        return -1;
+    }
+    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+    auto label = txn_info_pb.label();
+    std::string txn_label = txn_label_key({instance_id_, db_id, label});
+    std::string txn_label_val;
+    TxnLabelPB txn_label_pb;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to init txn";
+        return -1;
+    }
+    if (txn->get(txn_label, &txn_label_val) != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_label);
+        return -1;
+    }
+    txn_label_pb.ParseFromString(txn_label_val);
+    auto txn_ids = txn_label_pb.txn_ids();
+    if (!std::count(txn_ids.begin(), txn_ids.end(), txn_id)) {
+        // clang-format off txn_info_pb
+        LOG(WARNING) << "txn_info_pb's txn_id not found in txn_label_pb info,"
+                     << " txn_id: " << txn_id
+                     << " txn_label_pb meta: " << 
txn_label_pb.ShortDebugString();
+        // clang-format on
+        return 1;
+    }
+    return 0;
+}
+
+int InstanceChecker::check_txn_index_key(std::string_view key, 
std::string_view value) {
+    TxnInfoPB txn_info_pb;
+    std::string_view k1 = key;
+    k1.remove_prefix(1);
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    decode_key(&k1, &out);
+    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+        LOG(WARNING) << "failed to parse TxnInfoPB";
+        return -1;
+    }
+    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+    std::string txn_index = txn_index_key({instance_id_, txn_id});
+    std::string txn_index_val;
+    TxnIndexPB txn_index_pb;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to init txn";
+        return -1;
+    }
+    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_index);
+        return -1;
+    }
+    txn_index_pb.ParseFromString(txn_index_val);
+    if (txn_index_pb.tablet_index().db_id() != db_id) {
+        // clang-format off txn_info_pb
+        LOG(WARNING) << "txn_index_pb's db_id not same with txn_info_pb's 
db_id,"
+                     << " txn_index_pb meta: " << 
txn_index_pb.ShortDebugString()
+                     << " txn_info_pb meta: " << 
txn_info_pb.ShortDebugString();
+        // clang-format on
+        return 1;
+    }
+    return 0;
+}
+
+int InstanceChecker::check_txn_running_key(std::string_view key, 
std::string_view value) {
+    TxnRunningPB txn_running_pb;
+    int64_t current_time =
+            
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+    if (!txn_running_pb.ParseFromArray(value.data(), value.size())) {
+        LOG(WARNING) << "failed to parse TxnRunningPB";
+        return -1;
+    }
+    if (txn_running_pb.timeout_time() <= current_time) {
+        LOG(WARNING) << "txn_running_pb.timeout_time() is less than 
current_time,"
+                     << " but txn_running_key exists, "
+                     << " txn_running_pb meta: " << 
txn_running_pb.ShortDebugString();
+        return 1;
+    }
+    return 0;
+}
+
+int InstanceChecker::do_txn_key_check() {
+    int ret = 0;
+
+    // check txn info key depend on txn label key
+    std::string begin = txn_label_key({instance_id_, 0, ""});
+    std::string end = txn_label_key({instance_id_, INT64_MAX, ""});
+    int64_t num_scanned = 0;
+    int64_t num_abnormal = 0;
+    LOG(INFO) << "begin check txn_label_key and txn_info_key";
+    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, 
std::string_view v) -> int {
+        num_scanned++;
+        int ret = check_txn_info_key(k, v);
+        if (ret == 1) {
+            num_abnormal++;
+        }
+        return ret;
+    });
+
+    if (ret == 1) {
+        LOG(WARNING) << "failed to check txn_info_key depending on 
txn_label_key, num_scanned="
+                     << num_scanned << ", num_abnormal=" << num_abnormal;
+        return 1;
+    } else if (ret == -1) {
+        LOG(WARNING) << "failed to check txn label key and txn info key";
+        return -1;
+    }
+
+    // check txn label key depend on txn info key
+    begin = txn_info_key({instance_id_, 0, 0});
+    end = txn_info_key({instance_id_, INT64_MAX, 0});
+    num_scanned = 0;
+    num_abnormal = 0;
+    LOG(INFO) << "begin check txn_label_key and txn_info_key";
+    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, 
std::string_view v) -> int {
+        num_scanned++;
+        int ret = check_txn_label_key(k, v);
+        if (ret == 1) {
+            num_abnormal++;
+        }
+        return ret;
+    });
+    if (ret == 1) {
+        LOG(WARNING) << "failed to check txn_label_key depending on 
txn_info_key, num_scanned="
+                     << num_scanned << ", num_abnormal=" << num_abnormal;
+        return 1;
+    } else if (ret == -1) {
+        LOG(WARNING) << "failed to inverted check txn label key and txn info 
key";
+        return -1;
+    }
+    LOG(INFO) << "finish check txn_label_key and txn_info_key, num_scanned=" 
<< num_scanned
+              << ", num_abnormal=" << num_abnormal;
+
+    // check txn index key depend on txn info key
+    begin = txn_info_key({instance_id_, 0, 0});
+    end = txn_info_key({instance_id_, INT64_MAX, 0});
+    num_scanned = 0;
+    num_abnormal = 0;
+    LOG(INFO) << "begin check txn_index_key and txn_info_key";
+    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, 
std::string_view v) -> int {
+        num_scanned++;
+        int ret = check_txn_index_key(k, v);
+        if (ret == 1) {
+            num_abnormal++;
+        }
+        return ret;
+    });
+    if (ret == 1) {
+        LOG(WARNING) << "failed to check txn_idx_key depending on 
txn_info_key, num_scanned="
+                     << num_scanned << ", num_abnormal=" << num_abnormal;
+        return 1;
+    } else if (ret == -1) {
+        LOG(WARNING) << "failed to check txn index key";
+        return -1;
+    }
+    LOG(INFO) << "finish check txn_index_key and txn_info_key, num_scanned=" 
<< num_scanned
+              << ", num_abnormal=" << num_abnormal;
+
+    // check txn running key
+    begin = txn_running_key({instance_id_, 0, 0});
+    end = txn_running_key({instance_id_, INT64_MAX, 0});
+    num_scanned = 0;
+    num_abnormal = 0;
+    LOG(INFO) << "begin check txn_running_key";
+    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, 
std::string_view v) -> int {
+        num_scanned++;
+        int ret = check_txn_running_key(k, v);
+        if (ret == 1) {
+            num_abnormal++;
+        }
+        return ret;
+    });
+    if (ret == 1) {
+        LOG(WARNING) << "failed to check txn_running_key, num_scanned=" << 
num_scanned
+                     << ", num_abnormal=" << num_abnormal;
+        return 1;
+    } else if (ret == -1) {
+        LOG(WARNING) << "failed to check txn running key";
+        return -1;
+    }
+    LOG(INFO) << "finish check txn_running_key, num_scanned=" << num_scanned
+              << ", num_abnormal=" << num_abnormal;
+    return 0;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 39c990d8831..5993aa0b5de 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -111,6 +111,8 @@ public:
 
     int do_restore_job_check();
 
+    int do_txn_key_check();
+
     // If there are multiple buckets, return the minimum lifecycle; if there 
are no buckets (i.e.
     // all accessors are HdfsAccessor), return INT64_MAX.
     // Return 0 if success, otherwise error
@@ -171,6 +173,13 @@ private:
     // Return 1 if key leak is identified.
     // Return negative if a temporary error occurred during the check process.
     int check_stats_tablet_key_leaked(std::string_view key, std::string_view 
value);
+    int check_txn_info_key(std::string_view key, std::string_view value);
+
+    int check_txn_label_key(std::string_view key, std::string_view value);
+
+    int check_txn_index_key(std::string_view key, std::string_view value);
+
+    int check_txn_running_key(std::string_view key, std::string_view value);
 
     /**
      * It is used to scan the key in the range from start_key to end_key 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index b9ff08f7a17..c02925b9496 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -6300,4 +6300,392 @@ TEST(CheckerTest, CheckCostTooMuchTime) {
     ASSERT_EQ(rowset_metas.size(), NUM_BATCH_SIZE * 2);
 }
 
+TEST(CheckerTest, check_txn_info_key) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    int64_t db_id = 1001;
+    std::string label = "test_label";
+    std::vector<int64_t> txn_ids = {2001, 2002, 2003};
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string label_key, label_val;
+        txn_label_key({instance_id, db_id, label}, &label_key);
+
+        TxnLabelPB txn_label_pb;
+        for (auto txn_id : txn_ids) {
+            txn_label_pb.add_txn_ids(txn_id);
+        }
+
+        std::string serialized_label;
+        ASSERT_TRUE(txn_label_pb.SerializeToString(&serialized_label));
+
+        MemTxnKv::gen_version_timestamp(123456790, 0, &serialized_label);
+
+        txn->put(label_key, serialized_label);
+
+        for (auto txn_id : txn_ids) {
+            std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_label(label);
+            std::string info_val;
+            ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+            txn->put(info_key, info_val);
+        }
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        std::string label_key = txn_label_key({instance_id, db_id, label});
+        std::string label_val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_info_key(label_key, label_val);
+        ASSERT_EQ(ret, 0);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        int64_t mismatch_txn_id = txn_ids[0];
+        std::string info_key = txn_info_key({instance_id, db_id, 
mismatch_txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label("mismatched_label");
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        std::string label_key = txn_label_key({instance_id, db_id, label});
+        std::string label_val;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_info_key(label_key, label_val);
+        ASSERT_EQ(ret, 1);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string label_key = txn_label_key({instance_id, db_id, 
"invalid_label"});
+        std::string invalid_val = "invalid_protobuf_data";
+
+        uint32_t offset = invalid_val.size();
+        invalid_val.append(10, '\x00');
+        invalid_val.append((const char*)&offset, 4);
+
+        txn->put(label_key, invalid_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_info_key(label_key, invalid_val);
+        ASSERT_EQ(ret, -1);
+    }
+}
+
+TEST(CheckerTest, check_txn_label_key) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    int64_t db_id = 1001;
+    std::string label = "test_label";
+    std::vector<int64_t> txn_ids = {2001, 2002, 2003};
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string label_key = txn_label_key({instance_id, db_id, label});
+
+        TxnLabelPB txn_label_pb;
+        for (auto txn_id : txn_ids) {
+            txn_label_pb.add_txn_ids(txn_id);
+        }
+
+        std::string label_val;
+        ASSERT_TRUE(txn_label_pb.SerializeToString(&label_val));
+        txn->put(label_key, label_val);
+
+        for (auto txn_id : txn_ids) {
+            std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_label(label);
+            std::string info_val;
+            ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+            txn->put(info_key, info_val);
+        }
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        for (auto txn_id : txn_ids) {
+            std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+            std::string info_val;
+            std::unique_ptr<Transaction> txn;
+            ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+            ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+            int ret = checker.check_txn_label_key(info_key, info_val);
+            ASSERT_EQ(ret, 0);
+        }
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        int64_t missing_txn_id = 2004;
+        std::string info_key = txn_info_key({instance_id, db_id, 
missing_txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label(label);
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_label_key(info_key, info_val);
+        ASSERT_EQ(ret, 1);
+    }
+
+    {
+        std::string invalid_key = txn_info_key({instance_id, db_id, 9999});
+        std::string invalid_val = "invalid_protobuf_data";
+
+        int ret = checker.check_txn_label_key(invalid_key, invalid_val);
+        ASSERT_EQ(ret, -1);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string non_existent_label = "non_existent_label";
+        int64_t txn_id = 3001;
+        std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label(non_existent_label);
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_label_key(info_key, info_val);
+        ASSERT_EQ(ret, -1);
+    }
+}
+
+TEST(CheckerTest, check_txn_index_key) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    int64_t db_id = 1001;
+    int64_t txn_id = 2001;
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label("test_label");
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        std::string index_key = txn_index_key({instance_id, txn_id});
+        TxnIndexPB txn_index_pb;
+        TabletIndexPB* tablet_index = txn_index_pb.mutable_tablet_index();
+        tablet_index->set_db_id(db_id);
+        std::string index_val;
+        ASSERT_TRUE(txn_index_pb.SerializeToString(&index_val));
+        txn->put(index_key, index_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+        std::string info_val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_index_key(info_key, info_val);
+        ASSERT_EQ(ret, 0);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        int64_t different_db_id = 1002;
+        int64_t new_txn_id = 2002;
+
+        std::string info_key = txn_info_key({instance_id, db_id, new_txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label("test_label_2");
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        std::string index_key = txn_index_key({instance_id, new_txn_id});
+        TxnIndexPB txn_index_pb;
+        TabletIndexPB* tablet_index = txn_index_pb.mutable_tablet_index();
+        tablet_index->set_db_id(different_db_id);
+        std::string index_val;
+        ASSERT_TRUE(txn_index_pb.SerializeToString(&index_val));
+        txn->put(index_key, index_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_index_key(info_key, info_val);
+        ASSERT_EQ(ret, 1);
+    }
+
+    {
+        std::string invalid_key = txn_info_key({instance_id, db_id, 9999});
+        std::string invalid_val = "invalid_protobuf_data";
+
+        int ret = checker.check_txn_index_key(invalid_key, invalid_val);
+        ASSERT_EQ(ret, -1);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        int64_t non_existent_txn_id = 3001;
+        std::string info_key = txn_info_key({instance_id, db_id, 
non_existent_txn_id});
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_label("test_label_3");
+        std::string info_val;
+        ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+        txn->put(info_key, info_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_index_key(info_key, info_val);
+        ASSERT_EQ(ret, -1);
+    }
+}
+
+TEST(CheckerTest, check_txn_running_key) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    int64_t db_id = 1001;
+    int64_t txn_id = 2001;
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
+        TxnRunningPB txn_running_pb;
+
+        int64_t current_time = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                       
std::chrono::system_clock::now().time_since_epoch())
+                                       .count();
+        txn_running_pb.set_timeout_time(current_time + 3600000);
+
+        std::string running_val;
+        ASSERT_TRUE(txn_running_pb.SerializeToString(&running_val));
+        txn->put(running_key, running_val);
+
+        std::string expired_key = txn_running_key({instance_id, db_id, txn_id 
+ 1});
+        TxnRunningPB expired_running_pb;
+        expired_running_pb.set_timeout_time(current_time - 3600000);
+
+        std::string expired_val;
+        ASSERT_TRUE(expired_running_pb.SerializeToString(&expired_val));
+        txn->put(expired_key, expired_val);
+
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
+        std::string running_val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(running_key, &running_val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_running_key(running_key, running_val);
+        ASSERT_EQ(ret, 0);
+    }
+
+    {
+        std::string expired_key = txn_running_key({instance_id, db_id, txn_id 
+ 1});
+        std::string expired_val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(expired_key, &expired_val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_running_key(expired_key, expired_val);
+        ASSERT_EQ(ret, 1);
+    }
+
+    {
+        std::string invalid_key = txn_running_key({instance_id, db_id, txn_id 
+ 2});
+        std::string invalid_val = "invalid_protobuf_data";
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(invalid_key, invalid_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        std::unique_ptr<Transaction> read_txn;
+        ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK);
+        std::string val;
+        ASSERT_EQ(read_txn->get(invalid_key, &val), TxnErrorCode::TXN_OK);
+
+        int ret = checker.check_txn_running_key(invalid_key, val);
+        ASSERT_EQ(ret, -1);
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to