This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 594d4fbf8f6 [feat](cloud) Support Transaction::watch_key (#57408)
594d4fbf8f6 is described below

commit 594d4fbf8f639059e4adbfa04545dfbf85ca50a6
Author: walter <[email protected]>
AuthorDate: Wed Oct 29 10:37:46 2025 +0800

    [feat](cloud) Support Transaction::watch_key (#57408)
---
 cloud/src/common/bvars.cpp          |   1 +
 cloud/src/common/bvars.h            |   1 +
 cloud/src/meta-store/mem_txn_kv.cpp |  95 ++++++++++++
 cloud/src/meta-store/mem_txn_kv.h   |  15 ++
 cloud/src/meta-store/txn_kv.cpp     |  22 +++
 cloud/src/meta-store/txn_kv.h       |  15 ++
 cloud/src/meta-store/txn_kv_error.h |   2 +
 cloud/test/mem_txn_kv_test.cpp      | 288 ++++++++++++++++++++++++++++++++++++
 cloud/test/txn_kv_test.cpp          | 166 +++++++++++++++++++++
 9 files changed, 605 insertions(+)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index a1dd5961c60..2ca080adbf4 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -163,6 +163,7 @@ bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
 bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
 bvar::LatencyRecorder g_bvar_txn_kv_put("txn_kv", "put");
 bvar::LatencyRecorder g_bvar_txn_kv_commit("txn_kv", "commit");
+bvar::LatencyRecorder g_bvar_txn_kv_watch_key("txn_kv", "watch_key");
 bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_key("txn_kv", 
"atomic_set_ver_key");
 bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_value("txn_kv", 
"atomic_set_ver_value");
 bvar::LatencyRecorder g_bvar_txn_kv_atomic_add("txn_kv", "atomic_add");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index ecb37bf18fd..d7ab5fc3099 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -659,6 +659,7 @@ extern bvar::LatencyRecorder g_bvar_txn_kv_get;
 extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
 extern bvar::LatencyRecorder g_bvar_txn_kv_put;
 extern bvar::LatencyRecorder g_bvar_txn_kv_commit;
+extern bvar::LatencyRecorder g_bvar_txn_kv_watch_key;
 extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_key;
 extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_value;
 extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_add;
diff --git a/cloud/src/meta-store/mem_txn_kv.cpp 
b/cloud/src/meta-store/mem_txn_kv.cpp
index dde93b21770..9be08d0a763 100644
--- a/cloud/src/meta-store/mem_txn_kv.cpp
+++ b/cloud/src/meta-store/mem_txn_kv.cpp
@@ -207,6 +207,7 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>& 
read_set,
     ++committed_version_;
 
     int16_t seq = 0;
+    std::set<std::string> modified_keys; // Track which keys were modified
     for (const auto& vec : op_list) {
         const auto& [op_type, k, v] = vec;
         LogItem log_item {op_type, committed_version_, k, v};
@@ -214,18 +215,21 @@ TxnErrorCode MemTxnKv::update(const 
std::set<std::string>& read_set,
         switch (op_type) {
         case memkv::ModifyOpType::PUT: {
             mem_kv_[k].push_front(Version {committed_version_, v});
+            modified_keys.insert(k);
             break;
         }
         case memkv::ModifyOpType::ATOMIC_SET_VER_KEY: {
             std::string ver_key(k);
             gen_version_timestamp(committed_version_, seq, &ver_key);
             mem_kv_[ver_key].push_front(Version {committed_version_, v});
+            modified_keys.insert(ver_key);
             break;
         }
         case memkv::ModifyOpType::ATOMIC_SET_VER_VAL: {
             std::string ver_val(v);
             gen_version_timestamp(committed_version_, seq, &ver_val);
             mem_kv_[k].push_front(Version {committed_version_, ver_val});
+            modified_keys.insert(k);
             break;
         }
         case memkv::ModifyOpType::ATOMIC_ADD: {
@@ -239,10 +243,12 @@ TxnErrorCode MemTxnKv::update(const 
std::set<std::string>& read_set,
             int64_t res = *(int64_t*)org_val.data() + *(int64_t*)v.data();
             std::memcpy(org_val.data(), &res, 8);
             mem_kv_[k].push_front(Version {committed_version_, org_val});
+            modified_keys.insert(k);
             break;
         }
         case memkv::ModifyOpType::REMOVE: {
             mem_kv_[k].push_front(Version {committed_version_, std::nullopt});
+            modified_keys.insert(k);
             break;
         }
         case memkv::ModifyOpType::REMOVE_RANGE: {
@@ -250,6 +256,7 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>& 
read_set,
             auto end_iter = mem_kv_.lower_bound(v);
             while (begin_iter != end_iter) {
                 mem_kv_[begin_iter->first].push_front(Version 
{committed_version_, std::nullopt});
+                modified_keys.insert(begin_iter->first);
                 begin_iter++;
             }
             break;
@@ -259,6 +266,11 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>& 
read_set,
         }
     }
 
+    // Trigger watches for all modified keys
+    for (const auto& key : modified_keys) {
+        trigger_watches(key);
+    }
+
     *committed_version = committed_version_;
     return TxnErrorCode::TXN_OK;
 }
@@ -316,6 +328,51 @@ std::unique_ptr<FullRangeGetIterator> 
MemTxnKv::full_range_get(std::string begin
                                                          std::move(opts));
 }
 
+void MemTxnKv::register_watch(const std::string& key, 
std::shared_ptr<WatchInfo> watch_info) {
+    std::lock_guard<std::mutex> l(lock_);
+    watches_[key].push_back(std::move(watch_info));
+}
+
+void MemTxnKv::trigger_watches(const std::string& key) {
+    // Must be called with lock_ held
+    auto it = watches_.find(key);
+    if (it == watches_.end()) {
+        return;
+    }
+
+    // Get the current version for this key
+    int64_t current_version = -1;
+    auto kv_it = mem_kv_.find(key);
+    if (kv_it != mem_kv_.end() && !kv_it->second.empty()) {
+        current_version = kv_it->second.front().commit_version;
+    }
+
+    // Trigger and remove watches where the version has changed
+    std::vector<std::shared_ptr<WatchInfo>> to_trigger;
+    auto& watch_list = it->second;
+    for (auto iter = watch_list.begin(); iter != watch_list.end();) {
+        auto& watch = *iter;
+        // Trigger if the current version is greater than the watch version
+        // This means the key has been modified since the watch was set
+        if (current_version > watch->watch_version) {
+            to_trigger.push_back(watch);
+            iter = watch_list.erase(iter);
+        } else {
+            ++iter;
+        }
+    }
+
+    if (watch_list.empty()) {
+        watches_.erase(it);
+    }
+
+    for (auto& watch : to_trigger) {
+        std::lock_guard<std::mutex> watch_lock(watch->mutex);
+        watch->triggered = true;
+        watch->cv.notify_all();
+    }
+}
+
 } // namespace doris::cloud
 
 namespace doris::cloud::memkv {
@@ -676,6 +733,44 @@ TxnErrorCode Transaction::get_committed_version(int64_t* 
version) {
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode Transaction::watch_key(std::string_view key) {
+    std::string k(key.data(), key.size());
+
+    // Commit the transaction
+    auto commit_code = commit();
+    if (commit_code != TxnErrorCode::TXN_OK) {
+        return commit_code;
+    }
+
+    int64_t watch_version = read_version_;
+    auto watch_info = std::make_shared<MemTxnKv::WatchInfo>();
+    watch_info->watch_version = watch_version;
+
+    // Register the watch and check if the key has already changed
+    // This is done atomically to avoid race condition
+    {
+        std::lock_guard<std::mutex> l(kv_->lock_);
+
+        // Check if the key has been modified after our watch version
+        auto kv_it = kv_->mem_kv_.find(k);
+        if (kv_it != kv_->mem_kv_.end() && !kv_it->second.empty()) {
+            const auto& latest_version = kv_it->second.front();
+            if (latest_version.commit_version > watch_version) {
+                // Key has already changed, return immediately without blocking
+                return TxnErrorCode::TXN_OK;
+            }
+        }
+
+        // Register the watch only if the key hasn't changed yet
+        kv_->watches_[k].push_back(watch_info);
+    }
+
+    // Wait for the watch to be triggered
+    std::unique_lock<std::mutex> watch_lock(watch_info->mutex);
+    watch_info->cv.wait(watch_lock, [&watch_info] { return 
watch_info->triggered; });
+
+    return TxnErrorCode::TXN_OK;
+}
 TxnErrorCode Transaction::abort() {
     return TxnErrorCode::TXN_OK;
 }
diff --git a/cloud/src/meta-store/mem_txn_kv.h 
b/cloud/src/meta-store/mem_txn_kv.h
index f6fa9735634..d7edd1bd5af 100644
--- a/cloud/src/meta-store/mem_txn_kv.h
+++ b/cloud/src/meta-store/mem_txn_kv.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <condition_variable>
 #include <cstddef>
 #include <cstdint>
 #include <list>
@@ -77,6 +78,15 @@ public:
     int64_t put_count_ {};
     int64_t del_count_ {};
 
+    struct WatchInfo {
+        std::mutex mutex;
+        std::condition_variable cv;
+        bool triggered {false};
+        int64_t watch_version {-1};
+    };
+
+    void register_watch(const std::string& key, std::shared_ptr<WatchInfo> 
watch_info);
+
 private:
     using OpTuple = std::tuple<memkv::ModifyOpType, std::string, std::string>;
     TxnErrorCode update(const std::set<std::string>& read_set, const 
std::vector<OpTuple>& op_list,
@@ -110,9 +120,12 @@ private:
 
     std::map<std::string, std::list<Version>> mem_kv_;
     std::unordered_map<std::string, std::list<LogItem>> log_kv_;
+    std::unordered_map<std::string, std::vector<std::shared_ptr<WatchInfo>>> 
watches_;
     mutable std::mutex lock_;
     int64_t committed_version_ = 0;
     int64_t read_version_ = 0;
+
+    void trigger_watches(const std::string& key);
 };
 
 namespace memkv {
@@ -217,6 +230,8 @@ public:
      */
     TxnErrorCode commit() override;
 
+    TxnErrorCode watch_key(std::string_view key) override;
+
     TxnErrorCode get_read_version(int64_t* version) override;
     TxnErrorCode get_committed_version(int64_t* version) override;
 
diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp
index f83b0cbd98b..e1bf87ee6ee 100644
--- a/cloud/src/meta-store/txn_kv.cpp
+++ b/cloud/src/meta-store/txn_kv.cpp
@@ -248,6 +248,7 @@ constexpr fdb_error_t FDB_ERROR_CODE_TIMED_OUT = 1004;
 constexpr fdb_error_t FDB_ERROR_CODE_TXN_TOO_OLD = 1007;
 constexpr fdb_error_t FDB_ERROR_CODE_TXN_CONFLICT = 1020;
 constexpr fdb_error_t FDB_ERROR_CODE_TXN_TIMED_OUT = 1031;
+constexpr fdb_error_t FDB_ERROR_CODE_TOO_MANY_WATCHES = 1032;
 constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION_VALUE = 2006;
 constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION = 2007;
 constexpr fdb_error_t FDB_ERROR_CODE_VERSION_INVALID = 2011;
@@ -284,6 +285,8 @@ static TxnErrorCode cast_as_txn_code(fdb_error_t err) {
         return TxnErrorCode::TXN_TOO_OLD;
     case FDB_ERROR_CODE_TXN_CONFLICT:
         return TxnErrorCode::TXN_CONFLICT;
+    case FDB_ERROR_CODE_TOO_MANY_WATCHES:
+        return TxnErrorCode::TXN_TOO_MANY_WATCHES;
     }
 
     if (fdb_error_predicate(FDB_ERROR_PREDICATE_MAYBE_COMMITTED, err)) {
@@ -774,6 +777,25 @@ TxnErrorCode Transaction::commit() {
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode Transaction::watch_key(std::string_view key) {
+    StopWatch sw;
+    auto* fut = fdb_transaction_watch(txn_, (uint8_t*)key.data(), key.size());
+    DORIS_CLOUD_DEFER {
+        fdb_future_destroy(fut);
+        g_bvar_txn_kv_watch_key << sw.elapsed_us();
+    };
+
+    RETURN_IF_ERROR(commit());
+    RETURN_IF_ERROR(await_future(fut));
+    auto err = fdb_future_get_error(fut);
+    TEST_SYNC_POINT_CALLBACK("transaction:watch_key:get_err", &err);
+    if (err) {
+        LOG(WARNING) << "fdb watch key " << hex(key) << ": " << 
fdb_get_error(err);
+        return cast_as_txn_code(err);
+    }
+    return TxnErrorCode::TXN_OK;
+}
+
 TxnErrorCode Transaction::get_read_version(int64_t* version) {
     StopWatch sw;
     auto* fut = fdb_transaction_get_read_version(txn_);
diff --git a/cloud/src/meta-store/txn_kv.h b/cloud/src/meta-store/txn_kv.h
index d91542a48dd..d93ad46823c 100644
--- a/cloud/src/meta-store/txn_kv.h
+++ b/cloud/src/meta-store/txn_kv.h
@@ -251,6 +251,19 @@ public:
      */
     virtual TxnErrorCode commit() = 0;
 
+    /**
+     * Issue a watch on `key`, it will commit the txn and wait until the watch 
is triggered.
+     *
+     * A watch’s behavior is relative to the transaction that created it. A 
watch will report a change in
+     * relation to the key’s value as readable by that transaction. The 
initial value used for comparison
+     * is either that of the transaction’s read version or the value as 
modified by the transaction itself
+     * prior to the creation of the watch. If the value changes and then 
changes back to its initial value,
+     * the watch might not report the change.
+     *
+     * @return TXN_OK for success otherwise error
+     */
+    virtual TxnErrorCode watch_key(std::string_view key) = 0;
+
     /**
      * Gets the read version used by the txn.
      * Note that it does not make any sense we call this function before
@@ -790,6 +803,8 @@ public:
      */
     TxnErrorCode commit() override;
 
+    TxnErrorCode watch_key(std::string_view key) override;
+
     TxnErrorCode get_read_version(int64_t* version) override;
     TxnErrorCode get_committed_version(int64_t* version) override;
 
diff --git a/cloud/src/meta-store/txn_kv_error.h 
b/cloud/src/meta-store/txn_kv_error.h
index df4d24a16b3..60162c26ac9 100644
--- a/cloud/src/meta-store/txn_kv_error.h
+++ b/cloud/src/meta-store/txn_kv_error.h
@@ -39,6 +39,7 @@ enum class [[nodiscard]] TxnErrorCode : int {
     TXN_UNIDENTIFIED_ERROR = -10,
     // the data is invalid.
     TXN_INVALID_DATA = -11,
+    TXN_TOO_MANY_WATCHES = -12,
 };
 
 inline const char* format_as(TxnErrorCode code) {
@@ -57,6 +58,7 @@ inline const char* format_as(TxnErrorCode code) {
     case TxnErrorCode::TXN_BYTES_TOO_LARGE: return "Transaction exceeds byte 
limit";
     case TxnErrorCode::TXN_UNIDENTIFIED_ERROR: return "Unknown";
     case TxnErrorCode::TXN_INVALID_DATA: return "The data is invalid";
+    case TxnErrorCode::TXN_TOO_MANY_WATCHES: return "Too many watches";
     }
     return "NotImplemented";
     // clang-format on
diff --git a/cloud/test/mem_txn_kv_test.cpp b/cloud/test/mem_txn_kv_test.cpp
index b017da3d464..f9012fb144d 100644
--- a/cloud/test/mem_txn_kv_test.cpp
+++ b/cloud/test/mem_txn_kv_test.cpp
@@ -1494,3 +1494,291 @@ TEST(TxnMemKvTest, GetVersionstampTest) {
     versionstamp_test(mem_txn_kv);
     versionstamp_test(fdb_txn_kv);
 }
+
+static void watch_key_test(std::shared_ptr<cloud::TxnKv> txn_kv) {
+    using namespace doris::cloud;
+    std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != 
nullptr ? " memkv" : " fdb";
+
+    // Test 1: Watch a key that gets modified
+    {
+        std::unique_ptr<Transaction> txn;
+        std::string key = "watch_test_key1";
+        std::string initial_val = "initial_value";
+        std::string new_val = "new_value";
+
+        // Set initial value
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the key
+        std::atomic<bool> watch_triggered {false};
+        std::atomic<bool> read_success {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            ASSERT_EQ(val, initial_val);
+
+            read_success = true;
+
+            // This will block until the key is modified
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_triggered = true;
+        });
+
+        // Wait a bit to ensure the watch is registered
+        while (!read_success) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+
+        // Modify the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Wait for the watch to be triggered
+        watcher.join();
+        ASSERT_TRUE(watch_triggered) << txn_kv_class;
+    }
+
+    // Test 2: Watch a key that gets deleted
+    {
+        std::unique_ptr<Transaction> txn;
+        std::string key = "watch_test_key2";
+        std::string initial_val = "value_to_delete";
+
+        // Set initial value
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the key
+        std::atomic<bool> watch_triggered {false};
+        std::atomic<bool> read_success {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            read_success = true;
+
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_triggered = true;
+        });
+
+        while (!read_success) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+
+        // Delete the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->remove(key);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher.join();
+        ASSERT_TRUE(watch_triggered) << txn_kv_class;
+    }
+
+    // Test 3: Watch a non-existent key that gets created
+    {
+        std::unique_ptr<Transaction> txn;
+        std::string key = "watch_test_key3_nonexistent";
+        std::string new_val = "newly_created";
+
+        // Ensure the key doesn't exist
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->remove(key);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the non-existent key
+        std::atomic<bool> watch_triggered {false};
+        std::atomic<bool> read_success {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            auto ret = watch_txn->get(key, &val);
+            ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+            read_success = true;
+
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_triggered = true;
+        });
+
+        while (!read_success) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+
+        // Create the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher.join();
+        ASSERT_TRUE(watch_triggered) << txn_kv_class;
+    }
+
+    // Test 4: Multiple watches on the same key
+    {
+        std::unique_ptr<Transaction> txn;
+        std::string key = "watch_test_key4_multiple";
+        std::string initial_val = "multi_watch_initial";
+        std::string new_val = "multi_watch_new";
+
+        // Set initial value
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create multiple watches on the same key
+        std::atomic<int> watch_count {0};
+        std::atomic<int> read_count {0};
+        std::thread watcher1([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+
+            read_count++;
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_count++;
+        });
+
+        std::thread watcher2([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            read_count++;
+
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_count++;
+        });
+
+        std::thread watcher3([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            read_count++;
+
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_count++;
+        });
+
+        while (read_count.load() < 3) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+
+        // Modify the key - all watches should be triggered
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher1.join();
+        watcher2.join();
+        watcher3.join();
+
+        ASSERT_EQ(watch_count.load(), 3) << txn_kv_class;
+    }
+}
+
+TEST(TxnMemKvTest, WatchKeyTest) {
+    using namespace doris::cloud;
+
+    auto mem_txn_kv = 
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
+    ASSERT_NE(mem_txn_kv.get(), nullptr);
+
+    watch_key_test(mem_txn_kv);
+    watch_key_test(fdb_txn_kv);
+}
+
+static void watch_key_race_condition_test(std::shared_ptr<cloud::TxnKv> 
txn_kv) {
+    using namespace doris::cloud;
+    std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != 
nullptr ? " memkv" : " fdb";
+
+    // Test race condition: txn2 modifies the key after txn1 commits but 
before watch is registered
+    {
+        std::unique_ptr<Transaction> txn;
+        std::string key = "watch_race_key_" + std::to_string(time(nullptr));
+        std::string initial_val = "initial";
+        std::string new_val = "modified";
+
+        // Set initial value
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        std::atomic<bool> watch_triggered {false};
+        std::atomic<bool> txn1_committed {false};
+        std::atomic<bool> txn2_can_proceed {false};
+
+        // Thread 1: Watch the key
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            ASSERT_EQ(val, initial_val);
+
+            // Signal that we've read the value and are about to commit
+            txn2_can_proceed = true;
+
+            // Small delay to increase the chance of race condition
+            std::this_thread::sleep_for(std::chrono::microseconds(100));
+
+            // This will commit and then try to register watch
+            // During this time, txn2 might modify the key
+            txn1_committed = true;
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) << 
txn_kv_class;
+            watch_triggered = true;
+        });
+
+        // Thread 2: Modify the key right after txn1 commits
+        std::thread modifier([&]() {
+            // Wait for watcher to read the value
+            while (!txn2_can_proceed) {
+                std::this_thread::sleep_for(std::chrono::microseconds(10));
+            }
+
+            // Wait for txn1 to commit but try to modify before watch is 
registered
+            while (!txn1_committed) {
+                std::this_thread::sleep_for(std::chrono::microseconds(10));
+            }
+
+            // Modify the key - this should trigger the watch even if it 
happens
+            // between commit and watch registration
+            std::unique_ptr<Transaction> modify_txn;
+            ASSERT_EQ(txn_kv->create_txn(&modify_txn), TxnErrorCode::TXN_OK);
+            modify_txn->put(key, new_val);
+            ASSERT_EQ(modify_txn->commit(), TxnErrorCode::TXN_OK);
+        });
+
+        watcher.join();
+        modifier.join();
+
+        // The watch should have been triggered (or returned immediately if 
change was detected)
+        ASSERT_TRUE(watch_triggered) << txn_kv_class;
+
+        // Verify the final value
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string final_val;
+        ASSERT_EQ(txn->get(key, &final_val), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(final_val, new_val) << txn_kv_class;
+    }
+}
+
+TEST(TxnMemKvTest, WatchKeyRaceConditionTest) {
+    using namespace doris::cloud;
+
+    auto mem_txn_kv = 
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
+    ASSERT_NE(mem_txn_kv.get(), nullptr);
+
+    // Run the test multiple times to increase the chance of catching race 
conditions
+    for (int i = 0; i < 10; ++i) {
+        watch_key_race_condition_test(mem_txn_kv);
+    }
+    watch_key_race_condition_test(fdb_txn_kv);
+}
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index c2fe0eb8435..7be9845bc46 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -1582,3 +1582,169 @@ TEST(TxnKvTest, ReportConflictingRange) {
     ASSERT_EQ(values[1].second, "0");
     ASSERT_TRUE(values[1].first.starts_with(key));
 }
+
+TEST(TxnKvTest, WatchKey) {
+    std::string key = "watch_key_test_" + std::to_string(time(nullptr));
+    std::string initial_val = "initial_value";
+    std::string new_val = "new_value";
+
+    // Test 1: Watch a key that gets modified
+    {
+        // Set initial value
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the key
+        std::atomic<bool> watch_triggered {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+            ASSERT_EQ(val, initial_val);
+
+            // This will block until the key is modified
+            ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK);
+            watch_triggered = true;
+        });
+
+        // Wait a bit to ensure the watch is registered
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        // Modify the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Wait for the watch to be triggered
+        watcher.join();
+        ASSERT_TRUE(watch_triggered);
+    }
+
+    // Test 2: Watch a key that gets deleted
+    {
+        std::string key2 = key + "_delete";
+
+        // Set initial value
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key2, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the key
+        std::atomic<bool> watch_triggered {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key2, &val), TxnErrorCode::TXN_OK);
+
+            ASSERT_EQ(watch_txn->watch_key(key2), TxnErrorCode::TXN_OK);
+            watch_triggered = true;
+        });
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        // Delete the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->remove(key2);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher.join();
+        ASSERT_TRUE(watch_triggered);
+    }
+
+    // Test 3: Watch a non-existent key that gets created
+    {
+        std::string key3 = key + "_create";
+
+        // Ensure the key doesn't exist
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->remove(key3);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create a watch on the non-existent key
+        std::atomic<bool> watch_triggered {false};
+        std::thread watcher([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            auto ret = watch_txn->get(key3, &val);
+            ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+            ASSERT_EQ(watch_txn->watch_key(key3), TxnErrorCode::TXN_OK);
+            watch_triggered = true;
+        });
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        // Create the key
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key3, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher.join();
+        ASSERT_TRUE(watch_triggered);
+    }
+
+    // Test 4: Multiple watches on the same key
+    {
+        std::string key4 = key + "_multiple";
+
+        // Set initial value
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key4, initial_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        // Create multiple watches on the same key
+        std::atomic<int> watch_count {0};
+        std::thread watcher1([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+            ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+            watch_count++;
+        });
+
+        std::thread watcher2([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+            ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+            watch_count++;
+        });
+
+        std::thread watcher3([&]() {
+            std::unique_ptr<Transaction> watch_txn;
+            ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+            std::string val;
+            ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+            ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+            watch_count++;
+        });
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        // Modify the key - all watches should be triggered
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key4, new_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+        watcher1.join();
+        watcher2.join();
+        watcher3.join();
+
+        ASSERT_EQ(watch_count.load(), 3);
+    }
+
+    std::cout << "WatchKey test completed successfully" << std::endl;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to