This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 594d4fbf8f6 [feat](cloud) Support Transaction::watch_key (#57408)
594d4fbf8f6 is described below
commit 594d4fbf8f639059e4adbfa04545dfbf85ca50a6
Author: walter <[email protected]>
AuthorDate: Wed Oct 29 10:37:46 2025 +0800
[feat](cloud) Support Transaction::watch_key (#57408)
---
cloud/src/common/bvars.cpp | 1 +
cloud/src/common/bvars.h | 1 +
cloud/src/meta-store/mem_txn_kv.cpp | 95 ++++++++++++
cloud/src/meta-store/mem_txn_kv.h | 15 ++
cloud/src/meta-store/txn_kv.cpp | 22 +++
cloud/src/meta-store/txn_kv.h | 15 ++
cloud/src/meta-store/txn_kv_error.h | 2 +
cloud/test/mem_txn_kv_test.cpp | 288 ++++++++++++++++++++++++++++++++++++
cloud/test/txn_kv_test.cpp | 166 +++++++++++++++++++++
9 files changed, 605 insertions(+)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index a1dd5961c60..2ca080adbf4 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -163,6 +163,7 @@ bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
bvar::LatencyRecorder g_bvar_txn_kv_put("txn_kv", "put");
bvar::LatencyRecorder g_bvar_txn_kv_commit("txn_kv", "commit");
+bvar::LatencyRecorder g_bvar_txn_kv_watch_key("txn_kv", "watch_key");
bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_key("txn_kv",
"atomic_set_ver_key");
bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_value("txn_kv",
"atomic_set_ver_value");
bvar::LatencyRecorder g_bvar_txn_kv_atomic_add("txn_kv", "atomic_add");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index ecb37bf18fd..d7ab5fc3099 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -659,6 +659,7 @@ extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_put;
extern bvar::LatencyRecorder g_bvar_txn_kv_commit;
+extern bvar::LatencyRecorder g_bvar_txn_kv_watch_key;
extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_key;
extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_set_ver_value;
extern bvar::LatencyRecorder g_bvar_txn_kv_atomic_add;
diff --git a/cloud/src/meta-store/mem_txn_kv.cpp
b/cloud/src/meta-store/mem_txn_kv.cpp
index dde93b21770..9be08d0a763 100644
--- a/cloud/src/meta-store/mem_txn_kv.cpp
+++ b/cloud/src/meta-store/mem_txn_kv.cpp
@@ -207,6 +207,7 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>&
read_set,
++committed_version_;
int16_t seq = 0;
+ std::set<std::string> modified_keys; // Track which keys were modified
for (const auto& vec : op_list) {
const auto& [op_type, k, v] = vec;
LogItem log_item {op_type, committed_version_, k, v};
@@ -214,18 +215,21 @@ TxnErrorCode MemTxnKv::update(const
std::set<std::string>& read_set,
switch (op_type) {
case memkv::ModifyOpType::PUT: {
mem_kv_[k].push_front(Version {committed_version_, v});
+ modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_KEY: {
std::string ver_key(k);
gen_version_timestamp(committed_version_, seq, &ver_key);
mem_kv_[ver_key].push_front(Version {committed_version_, v});
+ modified_keys.insert(ver_key);
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_VAL: {
std::string ver_val(v);
gen_version_timestamp(committed_version_, seq, &ver_val);
mem_kv_[k].push_front(Version {committed_version_, ver_val});
+ modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::ATOMIC_ADD: {
@@ -239,10 +243,12 @@ TxnErrorCode MemTxnKv::update(const
std::set<std::string>& read_set,
int64_t res = *(int64_t*)org_val.data() + *(int64_t*)v.data();
std::memcpy(org_val.data(), &res, 8);
mem_kv_[k].push_front(Version {committed_version_, org_val});
+ modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::REMOVE: {
mem_kv_[k].push_front(Version {committed_version_, std::nullopt});
+ modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::REMOVE_RANGE: {
@@ -250,6 +256,7 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>&
read_set,
auto end_iter = mem_kv_.lower_bound(v);
while (begin_iter != end_iter) {
mem_kv_[begin_iter->first].push_front(Version
{committed_version_, std::nullopt});
+ modified_keys.insert(begin_iter->first);
begin_iter++;
}
break;
@@ -259,6 +266,11 @@ TxnErrorCode MemTxnKv::update(const std::set<std::string>&
read_set,
}
}
+ // Trigger watches for all modified keys
+ for (const auto& key : modified_keys) {
+ trigger_watches(key);
+ }
+
*committed_version = committed_version_;
return TxnErrorCode::TXN_OK;
}
@@ -316,6 +328,51 @@ std::unique_ptr<FullRangeGetIterator>
MemTxnKv::full_range_get(std::string begin
std::move(opts));
}
+void MemTxnKv::register_watch(const std::string& key,
std::shared_ptr<WatchInfo> watch_info) {
+ std::lock_guard<std::mutex> l(lock_);
+ watches_[key].push_back(std::move(watch_info));
+}
+
+void MemTxnKv::trigger_watches(const std::string& key) {
+ // Must be called with lock_ held
+ auto it = watches_.find(key);
+ if (it == watches_.end()) {
+ return;
+ }
+
+ // Get the current version for this key
+ int64_t current_version = -1;
+ auto kv_it = mem_kv_.find(key);
+ if (kv_it != mem_kv_.end() && !kv_it->second.empty()) {
+ current_version = kv_it->second.front().commit_version;
+ }
+
+ // Trigger and remove watches where the version has changed
+ std::vector<std::shared_ptr<WatchInfo>> to_trigger;
+ auto& watch_list = it->second;
+ for (auto iter = watch_list.begin(); iter != watch_list.end();) {
+ auto& watch = *iter;
+ // Trigger if the current version is greater than the watch version
+ // This means the key has been modified since the watch was set
+ if (current_version > watch->watch_version) {
+ to_trigger.push_back(watch);
+ iter = watch_list.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+
+ if (watch_list.empty()) {
+ watches_.erase(it);
+ }
+
+ for (auto& watch : to_trigger) {
+ std::lock_guard<std::mutex> watch_lock(watch->mutex);
+ watch->triggered = true;
+ watch->cv.notify_all();
+ }
+}
+
} // namespace doris::cloud
namespace doris::cloud::memkv {
@@ -676,6 +733,44 @@ TxnErrorCode Transaction::get_committed_version(int64_t*
version) {
return TxnErrorCode::TXN_OK;
}
+TxnErrorCode Transaction::watch_key(std::string_view key) {
+ std::string k(key.data(), key.size());
+
+ // Commit the transaction
+ auto commit_code = commit();
+ if (commit_code != TxnErrorCode::TXN_OK) {
+ return commit_code;
+ }
+
+ int64_t watch_version = read_version_;
+ auto watch_info = std::make_shared<MemTxnKv::WatchInfo>();
+ watch_info->watch_version = watch_version;
+
+ // Register the watch and check if the key has already changed
+ // This is done atomically to avoid race condition
+ {
+ std::lock_guard<std::mutex> l(kv_->lock_);
+
+ // Check if the key has been modified after our watch version
+ auto kv_it = kv_->mem_kv_.find(k);
+ if (kv_it != kv_->mem_kv_.end() && !kv_it->second.empty()) {
+ const auto& latest_version = kv_it->second.front();
+ if (latest_version.commit_version > watch_version) {
+ // Key has already changed, return immediately without blocking
+ return TxnErrorCode::TXN_OK;
+ }
+ }
+
+ // Register the watch only if the key hasn't changed yet
+ kv_->watches_[k].push_back(watch_info);
+ }
+
+ // Wait for the watch to be triggered
+ std::unique_lock<std::mutex> watch_lock(watch_info->mutex);
+ watch_info->cv.wait(watch_lock, [&watch_info] { return
watch_info->triggered; });
+
+ return TxnErrorCode::TXN_OK;
+}
TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}
diff --git a/cloud/src/meta-store/mem_txn_kv.h
b/cloud/src/meta-store/mem_txn_kv.h
index f6fa9735634..d7edd1bd5af 100644
--- a/cloud/src/meta-store/mem_txn_kv.h
+++ b/cloud/src/meta-store/mem_txn_kv.h
@@ -17,6 +17,7 @@
#pragma once
+#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <list>
@@ -77,6 +78,15 @@ public:
int64_t put_count_ {};
int64_t del_count_ {};
+ struct WatchInfo {
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool triggered {false};
+ int64_t watch_version {-1};
+ };
+
+ void register_watch(const std::string& key, std::shared_ptr<WatchInfo>
watch_info);
+
private:
using OpTuple = std::tuple<memkv::ModifyOpType, std::string, std::string>;
TxnErrorCode update(const std::set<std::string>& read_set, const
std::vector<OpTuple>& op_list,
@@ -110,9 +120,12 @@ private:
std::map<std::string, std::list<Version>> mem_kv_;
std::unordered_map<std::string, std::list<LogItem>> log_kv_;
+ std::unordered_map<std::string, std::vector<std::shared_ptr<WatchInfo>>>
watches_;
mutable std::mutex lock_;
int64_t committed_version_ = 0;
int64_t read_version_ = 0;
+
+ void trigger_watches(const std::string& key);
};
namespace memkv {
@@ -217,6 +230,8 @@ public:
*/
TxnErrorCode commit() override;
+ TxnErrorCode watch_key(std::string_view key) override;
+
TxnErrorCode get_read_version(int64_t* version) override;
TxnErrorCode get_committed_version(int64_t* version) override;
diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp
index f83b0cbd98b..e1bf87ee6ee 100644
--- a/cloud/src/meta-store/txn_kv.cpp
+++ b/cloud/src/meta-store/txn_kv.cpp
@@ -248,6 +248,7 @@ constexpr fdb_error_t FDB_ERROR_CODE_TIMED_OUT = 1004;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_TOO_OLD = 1007;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_CONFLICT = 1020;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_TIMED_OUT = 1031;
+constexpr fdb_error_t FDB_ERROR_CODE_TOO_MANY_WATCHES = 1032;
constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION_VALUE = 2006;
constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION = 2007;
constexpr fdb_error_t FDB_ERROR_CODE_VERSION_INVALID = 2011;
@@ -284,6 +285,8 @@ static TxnErrorCode cast_as_txn_code(fdb_error_t err) {
return TxnErrorCode::TXN_TOO_OLD;
case FDB_ERROR_CODE_TXN_CONFLICT:
return TxnErrorCode::TXN_CONFLICT;
+ case FDB_ERROR_CODE_TOO_MANY_WATCHES:
+ return TxnErrorCode::TXN_TOO_MANY_WATCHES;
}
if (fdb_error_predicate(FDB_ERROR_PREDICATE_MAYBE_COMMITTED, err)) {
@@ -774,6 +777,25 @@ TxnErrorCode Transaction::commit() {
return TxnErrorCode::TXN_OK;
}
+TxnErrorCode Transaction::watch_key(std::string_view key) {
+ StopWatch sw;
+ auto* fut = fdb_transaction_watch(txn_, (uint8_t*)key.data(), key.size());
+ DORIS_CLOUD_DEFER {
+ fdb_future_destroy(fut);
+ g_bvar_txn_kv_watch_key << sw.elapsed_us();
+ };
+
+ RETURN_IF_ERROR(commit());
+ RETURN_IF_ERROR(await_future(fut));
+ auto err = fdb_future_get_error(fut);
+ TEST_SYNC_POINT_CALLBACK("transaction:watch_key:get_err", &err);
+ if (err) {
+ LOG(WARNING) << "fdb watch key " << hex(key) << ": " <<
fdb_get_error(err);
+ return cast_as_txn_code(err);
+ }
+ return TxnErrorCode::TXN_OK;
+}
+
TxnErrorCode Transaction::get_read_version(int64_t* version) {
StopWatch sw;
auto* fut = fdb_transaction_get_read_version(txn_);
diff --git a/cloud/src/meta-store/txn_kv.h b/cloud/src/meta-store/txn_kv.h
index d91542a48dd..d93ad46823c 100644
--- a/cloud/src/meta-store/txn_kv.h
+++ b/cloud/src/meta-store/txn_kv.h
@@ -251,6 +251,19 @@ public:
*/
virtual TxnErrorCode commit() = 0;
+ /**
+ * Issue a watch on `key`, it will commit the txn and wait until the watch
is triggered.
+ *
+ * A watch’s behavior is relative to the transaction that created it. A
watch will report a change in
+ * relation to the key’s value as readable by that transaction. The
initial value used for comparison
+ * is either that of the transaction’s read version or the value as
modified by the transaction itself
+ * prior to the creation of the watch. If the value changes and then
changes back to its initial value,
+ * the watch might not report the change.
+ *
+ * @return TXN_OK for success otherwise error
+ */
+ virtual TxnErrorCode watch_key(std::string_view key) = 0;
+
/**
* Gets the read version used by the txn.
* Note that it does not make any sense we call this function before
@@ -790,6 +803,8 @@ public:
*/
TxnErrorCode commit() override;
+ TxnErrorCode watch_key(std::string_view key) override;
+
TxnErrorCode get_read_version(int64_t* version) override;
TxnErrorCode get_committed_version(int64_t* version) override;
diff --git a/cloud/src/meta-store/txn_kv_error.h
b/cloud/src/meta-store/txn_kv_error.h
index df4d24a16b3..60162c26ac9 100644
--- a/cloud/src/meta-store/txn_kv_error.h
+++ b/cloud/src/meta-store/txn_kv_error.h
@@ -39,6 +39,7 @@ enum class [[nodiscard]] TxnErrorCode : int {
TXN_UNIDENTIFIED_ERROR = -10,
// the data is invalid.
TXN_INVALID_DATA = -11,
+ TXN_TOO_MANY_WATCHES = -12,
};
inline const char* format_as(TxnErrorCode code) {
@@ -57,6 +58,7 @@ inline const char* format_as(TxnErrorCode code) {
case TxnErrorCode::TXN_BYTES_TOO_LARGE: return "Transaction exceeds byte
limit";
case TxnErrorCode::TXN_UNIDENTIFIED_ERROR: return "Unknown";
case TxnErrorCode::TXN_INVALID_DATA: return "The data is invalid";
+ case TxnErrorCode::TXN_TOO_MANY_WATCHES: return "Too many watches";
}
return "NotImplemented";
// clang-format on
diff --git a/cloud/test/mem_txn_kv_test.cpp b/cloud/test/mem_txn_kv_test.cpp
index b017da3d464..f9012fb144d 100644
--- a/cloud/test/mem_txn_kv_test.cpp
+++ b/cloud/test/mem_txn_kv_test.cpp
@@ -1494,3 +1494,291 @@ TEST(TxnMemKvTest, GetVersionstampTest) {
versionstamp_test(mem_txn_kv);
versionstamp_test(fdb_txn_kv);
}
+
+static void watch_key_test(std::shared_ptr<cloud::TxnKv> txn_kv) {
+ using namespace doris::cloud;
+ std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) !=
nullptr ? " memkv" : " fdb";
+
+ // Test 1: Watch a key that gets modified
+ {
+ std::unique_ptr<Transaction> txn;
+ std::string key = "watch_test_key1";
+ std::string initial_val = "initial_value";
+ std::string new_val = "new_value";
+
+ // Set initial value
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the key
+ std::atomic<bool> watch_triggered {false};
+ std::atomic<bool> read_success {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(val, initial_val);
+
+ read_success = true;
+
+ // This will block until the key is modified
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_triggered = true;
+ });
+
+ // Wait a bit to ensure the watch is registered
+ while (!read_success) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ // Modify the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Wait for the watch to be triggered
+ watcher.join();
+ ASSERT_TRUE(watch_triggered) << txn_kv_class;
+ }
+
+ // Test 2: Watch a key that gets deleted
+ {
+ std::unique_ptr<Transaction> txn;
+ std::string key = "watch_test_key2";
+ std::string initial_val = "value_to_delete";
+
+ // Set initial value
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the key
+ std::atomic<bool> watch_triggered {false};
+ std::atomic<bool> read_success {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ read_success = true;
+
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_triggered = true;
+ });
+
+ while (!read_success) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ // Delete the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->remove(key);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher.join();
+ ASSERT_TRUE(watch_triggered) << txn_kv_class;
+ }
+
+ // Test 3: Watch a non-existent key that gets created
+ {
+ std::unique_ptr<Transaction> txn;
+ std::string key = "watch_test_key3_nonexistent";
+ std::string new_val = "newly_created";
+
+ // Ensure the key doesn't exist
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->remove(key);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the non-existent key
+ std::atomic<bool> watch_triggered {false};
+ std::atomic<bool> read_success {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ auto ret = watch_txn->get(key, &val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+ read_success = true;
+
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_triggered = true;
+ });
+
+ while (!read_success) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ // Create the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher.join();
+ ASSERT_TRUE(watch_triggered) << txn_kv_class;
+ }
+
+ // Test 4: Multiple watches on the same key
+ {
+ std::unique_ptr<Transaction> txn;
+ std::string key = "watch_test_key4_multiple";
+ std::string initial_val = "multi_watch_initial";
+ std::string new_val = "multi_watch_new";
+
+ // Set initial value
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create multiple watches on the same key
+ std::atomic<int> watch_count {0};
+ std::atomic<int> read_count {0};
+ std::thread watcher1([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+
+ read_count++;
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_count++;
+ });
+
+ std::thread watcher2([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ read_count++;
+
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_count++;
+ });
+
+ std::thread watcher3([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ read_count++;
+
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_count++;
+ });
+
+ while (read_count.load() < 3) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ // Modify the key - all watches should be triggered
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher1.join();
+ watcher2.join();
+ watcher3.join();
+
+ ASSERT_EQ(watch_count.load(), 3) << txn_kv_class;
+ }
+}
+
+TEST(TxnMemKvTest, WatchKeyTest) {
+ using namespace doris::cloud;
+
+ auto mem_txn_kv =
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
+ ASSERT_NE(mem_txn_kv.get(), nullptr);
+
+ watch_key_test(mem_txn_kv);
+ watch_key_test(fdb_txn_kv);
+}
+
+static void watch_key_race_condition_test(std::shared_ptr<cloud::TxnKv>
txn_kv) {
+ using namespace doris::cloud;
+ std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) !=
nullptr ? " memkv" : " fdb";
+
+ // Test race condition: txn2 modifies the key after txn1 commits but
before watch is registered
+ {
+ std::unique_ptr<Transaction> txn;
+ std::string key = "watch_race_key_" + std::to_string(time(nullptr));
+ std::string initial_val = "initial";
+ std::string new_val = "modified";
+
+ // Set initial value
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ std::atomic<bool> watch_triggered {false};
+ std::atomic<bool> txn1_committed {false};
+ std::atomic<bool> txn2_can_proceed {false};
+
+ // Thread 1: Watch the key
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(val, initial_val);
+
+ // Signal that we've read the value and are about to commit
+ txn2_can_proceed = true;
+
+ // Small delay to increase the chance of race condition
+ std::this_thread::sleep_for(std::chrono::microseconds(100));
+
+ // This will commit and then try to register watch
+ // During this time, txn2 might modify the key
+ txn1_committed = true;
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK) <<
txn_kv_class;
+ watch_triggered = true;
+ });
+
+ // Thread 2: Modify the key right after txn1 commits
+ std::thread modifier([&]() {
+ // Wait for watcher to read the value
+ while (!txn2_can_proceed) {
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ // Wait for txn1 to commit but try to modify before watch is
registered
+ while (!txn1_committed) {
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ // Modify the key - this should trigger the watch even if it
happens
+ // between commit and watch registration
+ std::unique_ptr<Transaction> modify_txn;
+ ASSERT_EQ(txn_kv->create_txn(&modify_txn), TxnErrorCode::TXN_OK);
+ modify_txn->put(key, new_val);
+ ASSERT_EQ(modify_txn->commit(), TxnErrorCode::TXN_OK);
+ });
+
+ watcher.join();
+ modifier.join();
+
+ // The watch should have been triggered (or returned immediately if
change was detected)
+ ASSERT_TRUE(watch_triggered) << txn_kv_class;
+
+ // Verify the final value
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string final_val;
+ ASSERT_EQ(txn->get(key, &final_val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(final_val, new_val) << txn_kv_class;
+ }
+}
+
+TEST(TxnMemKvTest, WatchKeyRaceConditionTest) {
+ using namespace doris::cloud;
+
+ auto mem_txn_kv =
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
+ ASSERT_NE(mem_txn_kv.get(), nullptr);
+
+ // Run the test multiple times to increase the chance of catching race
conditions
+ for (int i = 0; i < 10; ++i) {
+ watch_key_race_condition_test(mem_txn_kv);
+ }
+ watch_key_race_condition_test(fdb_txn_kv);
+}
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index c2fe0eb8435..7be9845bc46 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -1582,3 +1582,169 @@ TEST(TxnKvTest, ReportConflictingRange) {
ASSERT_EQ(values[1].second, "0");
ASSERT_TRUE(values[1].first.starts_with(key));
}
+
+TEST(TxnKvTest, WatchKey) {
+ std::string key = "watch_key_test_" + std::to_string(time(nullptr));
+ std::string initial_val = "initial_value";
+ std::string new_val = "new_value";
+
+ // Test 1: Watch a key that gets modified
+ {
+ // Set initial value
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the key
+ std::atomic<bool> watch_triggered {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(val, initial_val);
+
+ // This will block until the key is modified
+ ASSERT_EQ(watch_txn->watch_key(key), TxnErrorCode::TXN_OK);
+ watch_triggered = true;
+ });
+
+ // Wait a bit to ensure the watch is registered
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ // Modify the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Wait for the watch to be triggered
+ watcher.join();
+ ASSERT_TRUE(watch_triggered);
+ }
+
+ // Test 2: Watch a key that gets deleted
+ {
+ std::string key2 = key + "_delete";
+
+ // Set initial value
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key2, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the key
+ std::atomic<bool> watch_triggered {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key2, &val), TxnErrorCode::TXN_OK);
+
+ ASSERT_EQ(watch_txn->watch_key(key2), TxnErrorCode::TXN_OK);
+ watch_triggered = true;
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ // Delete the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->remove(key2);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher.join();
+ ASSERT_TRUE(watch_triggered);
+ }
+
+ // Test 3: Watch a non-existent key that gets created
+ {
+ std::string key3 = key + "_create";
+
+ // Ensure the key doesn't exist
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->remove(key3);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create a watch on the non-existent key
+ std::atomic<bool> watch_triggered {false};
+ std::thread watcher([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ auto ret = watch_txn->get(key3, &val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ ASSERT_EQ(watch_txn->watch_key(key3), TxnErrorCode::TXN_OK);
+ watch_triggered = true;
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ // Create the key
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key3, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher.join();
+ ASSERT_TRUE(watch_triggered);
+ }
+
+ // Test 4: Multiple watches on the same key
+ {
+ std::string key4 = key + "_multiple";
+
+ // Set initial value
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key4, initial_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create multiple watches on the same key
+ std::atomic<int> watch_count {0};
+ std::thread watcher1([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+ ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+ watch_count++;
+ });
+
+ std::thread watcher2([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+ ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+ watch_count++;
+ });
+
+ std::thread watcher3([&]() {
+ std::unique_ptr<Transaction> watch_txn;
+ ASSERT_EQ(txn_kv->create_txn(&watch_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(watch_txn->get(key4, &val), TxnErrorCode::TXN_OK);
+
+ ASSERT_EQ(watch_txn->watch_key(key4), TxnErrorCode::TXN_OK);
+ watch_count++;
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ // Modify the key - all watches should be triggered
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key4, new_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ watcher1.join();
+ watcher2.join();
+ watcher3.join();
+
+ ASSERT_EQ(watch_count.load(), 3);
+ }
+
+ std::cout << "WatchKey test completed successfully" << std::endl;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]