This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ebb8d52668c [fix](recycler) Concurrent recycle cause txn commit
conflict (#54849)
ebb8d52668c is described below
commit ebb8d52668c2da23c69f40bb57d0351021d6f8bb
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Aug 16 16:08:59 2025 +0800
[fix](recycler) Concurrent recycle cause txn commit conflict (#54849)
### What problem does this PR solve?
retry when txn conflict, 10 max retry times
before
```
W20250815 11:37:43.104856 448779 recycler.cpp:3925] failed to delete
expired txn, err=Conflict
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d4
W20250815 11:37:43.104873 448783 recycler.cpp:3925] failed to delete
expired txn, err=Conflict
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d8
W20250815 11:37:43.104945 448784 mem_txn_kv.cpp:200] commit conflict
I20250815 11:37:43.104902 448777 recycler.cpp:3932] recycle expired txn,
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d1
W20250815 11:37:43.104959 448782 recycler.cpp:3925] failed to delete
expired txn, err=Conflict
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d7
W20250815 11:37:43.104981 448779 recycler.cpp:3945] failed to delete
recycle txn kv instance id="concurrent_recycle_txn_label_test_Zq285DM7d6"
key="011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d4"
W20250815 11:37:43.104983 448783 recycler.cpp:3945] failed to delete
recycle txn kv instance id="concurrent_recycle_txn_label_test_Zq285DM7d6"
key="011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f5a71323835444d37643600011074786e00011200000000000003e81200000000000007d8"
```
now
```
W20250815 11:56:22.749135 595891 recycler.cpp:3925] txn conflict, retry
times=6
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
db_id=1000 txn_id=2007
I20250815 11:56:22.896646 595892 recycler.cpp:3942] recycle expired txn,
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d8
W20250815 11:56:22.942795 595891 mem_txn_kv.cpp:200] commit conflict
W20250815 11:56:22.942880 595891 recycler.cpp:3925] txn conflict, retry
times=7
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
db_id=1000 txn_id=2007
I20250815 11:56:23.079614 595891 recycler.cpp:3942] recycle expired txn,
key=011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e00011200000000000003e81200000000000007d7
I20250815 11:56:23.079846 595741 recycler.cpp:3578] finish scan_and_recycle
key_range=[011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e0001120000000000000000120000000000000000,011072656379636c65000110636f6e63757272656e745f72656379636c655f74786e5f6c6162656c5f746573745f3353706c496c4742534c00011074786e0001127fffffffffffffff127fffffffffffffff)
num_scanned=10 get_range_retried=0 ret=0 err=
I20250815 11:56:23.080000 595741 recycler.h:384] recycle instance:
concurrent_recycle_txn_label_test_3SplIlGBSL, operation type:
recycle_expired_txn_label, cost: 1385 ms, total recycled num: 10, total
recycled data size: 0 bytes
```
---
cloud/src/recycler/recycler.cpp | 28 ++++++-
cloud/test/recycler_test.cpp | 176 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 203 insertions(+), 1 deletion(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 89ce765b0d8..324165e139a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -30,6 +30,7 @@
#include <chrono>
#include <cstddef>
#include <cstdint>
+#include <cstdlib>
#include <deque>
#include <initializer_list>
#include <numeric>
@@ -3831,6 +3832,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
return 0;
};
+ // int 0 for success, 1 for conflict, -1 for error
auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
@@ -3896,17 +3898,29 @@ int InstanceRecycler::recycle_expired_txn_label() {
}
if (txn_label.txn_ids().empty()) {
txn->remove(label_key);
+ TEST_SYNC_POINT_CALLBACK(
+
"InstanceRecycler::recycle_expired_txn_label.remove_label_before");
} else {
if (!txn_label.SerializeToString(&label_val)) {
LOG(WARNING) << "failed to serialize txn label, key=" <<
hex(label_key);
return -1;
}
+ TEST_SYNC_POINT_CALLBACK(
+
"InstanceRecycler::recycle_expired_txn_label.update_label_before");
txn->atomic_set_ver_value(label_key, label_val);
+ TEST_SYNC_POINT_CALLBACK(
+
"InstanceRecycler::recycle_expired_txn_label.update_label_after");
}
// Remove recycle txn kv
txn->remove(k);
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.before_commit");
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_CONFLICT) {
+ TEST_SYNC_POINT_CALLBACK(
+
"InstanceRecycler::recycle_expired_txn_label.txn_conflict");
+ return 1;
+ }
LOG(WARNING) << "failed to delete expired txn, err=" << err << "
key=" << hex(k);
return -1;
}
@@ -3926,7 +3940,19 @@ int InstanceRecycler::recycle_expired_txn_label() {
&recycle_txn_info_keys);
for (const auto& k : recycle_txn_info_keys) {
concurrent_delete_executor.add([&]() {
- if (delete_recycle_txn_kv(k) != 0) {
+ int ret = delete_recycle_txn_kv(k);
+ if (ret == 1) {
+ constexpr int MAX_RETRY = 10;
+ for (size_t i = 0; i < MAX_RETRY; ++i) {
+ ret = delete_recycle_txn_kv(k);
+ LOG(WARNING) << "txn conflict, retry times=" << i << "
key=" << hex(k);
+ if (ret != 1) {
+ break;
+ }
+ // random sleep 0-100 ms to retry
+
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
+ }
+ } else if (ret == -1) {
LOG_WARNING("failed to delete recycle txn kv")
.tag("instance id", instance_id_)
.tag("key", hex(k));
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 8a3e4ff0c71..5fc1b673dd1 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -6002,4 +6002,180 @@ TEST(RecyclerTest,
concurrent_recycle_txn_label_failure_test) {
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 5000);
}
+TEST(RecyclerTest, concurrent_recycle_txn_label_conflict_test) {
+ config::label_keep_max_second = 0;
+ config::recycle_pool_parallelism = 20;
+
+ doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+ auto recycle_txn_label_s3_producer_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_s3_producer_pool->start();
+ auto recycle_txn_label_recycle_tablet_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_recycle_tablet_pool->start();
+ auto recycle_txn_label_group_recycle_function_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_group_recycle_function_pool->start();
+ recycle_txn_label_thread_group =
+
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+
std::move(recycle_txn_label_recycle_tablet_pool),
+
std::move(recycle_txn_label_group_recycle_function_pool));
+
+ auto mem_txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(mem_txn_kv->init(), 0);
+
+ std::string shared_label = "shared_conflict_label";
+ int64_t shared_db_id = 1000;
+ std::vector<int64_t> shared_txn_ids = {2001, 2002, 2003, 2004, 2005,
+ 2006, 2007, 2008, 2009, 2010};
+
+ // create shared TxnLabel
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string label_key;
+ std::string label_val;
+ txn_label_key({instance_id, shared_db_id, shared_label}, &label_key);
+
+ TxnLabelPB txn_label_pb;
+ for (auto txn_id : shared_txn_ids) {
+ txn_label_pb.add_txn_ids(txn_id);
+ }
+
+ if (!txn_label_pb.SerializeToString(&label_val)) {
+ FAIL() << "Failed to serialize txn label";
+ }
+
+ uint32_t offset = label_val.size();
+ label_val.append(10, '\x00'); // 10 bytes for versionstamp
+ label_val.append((const char*)&offset, 4);
+ MemTxnKv::gen_version_timestamp(123456790, 0, &label_val);
+ txn->put(label_key, label_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ int64_t current_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ for (auto txn_id : shared_txn_ids) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ // RecycleTxnKeyInfo -> RecycleTxnPB (set to expired)
+ std::string recycle_txn_info_key;
+ std::string recycle_txn_info_val;
+ RecycleTxnKeyInfo recycle_txn_key_info {instance_id, shared_db_id,
txn_id};
+ recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key);
+ RecycleTxnPB recycle_txn_pb;
+ recycle_txn_pb.set_creation_time(current_time - 300000);
+ recycle_txn_pb.set_label(shared_label);
+ if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) {
+ FAIL() << "Failed to serialize recycle txn";
+ }
+ txn->put(recycle_txn_info_key, recycle_txn_info_val);
+
+ // TxnIndexKey -> TxnIndexPB
+ std::string txn_idx_key = txn_index_key({instance_id, txn_id});
+ std::string txn_idx_val;
+ TxnIndexPB txn_index_pb;
+ if (!txn_index_pb.SerializeToString(&txn_idx_val)) {
+ FAIL() << "Failed to serialize txn index";
+ }
+ txn->put(txn_idx_key, txn_idx_val);
+
+ // TxnInfoKey -> TxnInfoPB
+ std::string info_key = txn_info_key({instance_id, shared_db_id,
txn_id});
+ std::string info_val;
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label(shared_label);
+ if (!txn_info_pb.SerializeToString(&info_val)) {
+ FAIL() << "Failed to serialize txn info";
+ }
+ txn->put(info_key, info_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ auto* sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+
+ std::atomic<int> update_label_before_count {0};
+ std::atomic<int> remove_label_before_count {0};
+ std::atomic<int> update_label_after_count {0};
+ std::atomic<int> txn_conflict_count {0};
+
+
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.remove_label_before",
+ [&](auto&& args) {
+ remove_label_before_count++;
+
std::this_thread::sleep_for(std::chrono::milliseconds(60));
+ });
+
+
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.update_label_before",
+ [&](auto&& args) {
+ update_label_before_count++;
+
std::this_thread::sleep_for(std::chrono::milliseconds(80));
+ });
+
+
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.update_label_after",
+ [&](auto&& args) { update_label_after_count++; });
+
+ sp->set_call_back(
+ "InstanceRecycler::recycle_expired_txn_label.before_commit",
+ [&](auto&& args) {
std::this_thread::sleep_for(std::chrono::milliseconds(20)); });
+
+
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.txn_conflict",
[&](auto&& args) {
+ txn_conflict_count++;
+ LOG(WARNING) << "Transaction conflict detected in test";
+ });
+
+ sp->enable_processing();
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ InstanceRecycler recycler(mem_txn_kv, instance,
recycle_txn_label_thread_group,
+ std::make_shared<TxnLazyCommitter>(mem_txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+
+ auto start = std::chrono::steady_clock::now();
+ ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+ auto finish = std::chrono::steady_clock::now();
+
+ std::cout << "Concurrent recycle cost="
+ << std::chrono::duration_cast<std::chrono::milliseconds>(finish
- start).count()
+ << "ms" << std::endl;
+ std::cout << "Update label before count: " << update_label_before_count <<
std::endl;
+ std::cout << "Update label after count: " << update_label_after_count <<
std::endl;
+ std::cout << "Transaction conflict count: " << txn_conflict_count <<
std::endl;
+
+ EXPECT_GT(txn_conflict_count, 0) << "txn_conflict sync point should be
triggered";
+
+ std::unique_ptr<Transaction> verify_txn;
+ ASSERT_EQ(mem_txn_kv->create_txn(&verify_txn), TxnErrorCode::TXN_OK);
+
+ RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0};
+ RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX,
INT64_MAX};
+ std::string begin_key = recycle_txn_key(recycle_txn_key_info0);
+ std::string end_key = recycle_txn_key(recycle_txn_key_info1);
+
+ std::unique_ptr<RangeGetIterator> it;
+ ASSERT_EQ(verify_txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ EXPECT_EQ(it->size(), 0) << "All recycle txn keys should be deleted";
+
+ std::string label_key;
+ std::string label_val;
+ txn_label_key({instance_id, shared_db_id, shared_label}, &label_key);
+ EXPECT_EQ(verify_txn->get(label_key, &label_val),
TxnErrorCode::TXN_KEY_NOT_FOUND)
+ << "Shared label should be deleted";
+
+ for (auto txn_id : shared_txn_ids) {
+ std::string info_key = txn_info_key({instance_id, shared_db_id,
txn_id});
+ std::string info_val;
+ EXPECT_EQ(verify_txn->get(info_key, &info_val),
TxnErrorCode::TXN_KEY_NOT_FOUND)
+ << "TxnInfo for txn_id " << txn_id << " should be deleted";
+ }
+}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]