This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 76b1260a93f branch-3.1: [fix](txn lazy commit) fix txn lazy commit
conflict with schema change #55349 (#55701)
76b1260a93f is described below
commit 76b1260a93f1eb60d8e72fb169d8b203e8b0c392
Author: Lei Zhang <[email protected]>
AuthorDate: Fri Sep 5 18:25:38 2025 +0800
branch-3.1: [fix](txn lazy commit) fix txn lazy commit conflict with schema
change #55349 (#55701)
picked from #55349
---
cloud/src/meta-service/txn_lazy_committer.cpp | 14 ++
cloud/test/txn_lazy_commit_test.cpp | 289 ++++++++++++++++++++------
2 files changed, 238 insertions(+), 65 deletions(-)
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 7a4755e44b9..1de69f59d08 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -121,6 +121,7 @@ void convert_tmp_rowsets(
LOG(WARNING) << msg;
return;
}
+
VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -130,6 +131,16 @@ void convert_tmp_rowsets(
}
LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
<< " version_pb:" << version_pb.ShortDebugString();
+
+ if (version_pb.pending_txn_ids_size() == 0 ||
version_pb.pending_txn_ids(0) != txn_id) {
+ LOG(INFO) << "txn_id=" << txn_id << " partition_id=" <<
tmp_rowset_pb.partition_id()
+ << " tmp_rowset_key=" << hex(tmp_rowset_key)
+ << " version has already been converted."
+ << " version_pb:" << version_pb.ShortDebugString();
+
TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted",
+ &version_pb);
+ return;
+ }
partition_versions.emplace(tmp_rowset_pb.partition_id(),
version_pb);
DCHECK_EQ(partition_versions.size(), 1) <<
partition_versions.size();
}
@@ -279,6 +290,9 @@ void make_committed_txn_visible(const std::string&
instance_id, int64_t db_id, i
txn->put(recycle_key, recycle_val);
LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" <<
txn_id;
+
TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit",
+ &code);
+
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 5b1686eed28..ff747fd810e 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -67,11 +67,6 @@ int main(int argc, char** argv) {
}
config::enable_cloud_txn_lazy_commit = true;
- config::txn_lazy_commit_rowsets_thresold = 2;
- config::txn_lazy_max_rowsets_per_batch = 2;
- config::txn_lazy_commit_num_threads = 2;
- config::max_tablet_index_num_per_batch = 2;
-
config::enable_txn_store_retry = false;
config::label_keep_max_second = 0;
@@ -156,8 +151,8 @@ static void create_tablet_with_db_id(MetaServiceProxy*
meta_service, int64_t db_
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
-static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int index_id,
- int partition_id, int64_t
version = -1,
+static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int64_t index_id,
+ int64_t partition_id, int64_t
version = -1,
int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
@@ -178,9 +173,9 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t
txn_id, int64_t tablet_id,
return rowset;
}
-static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t
tablet_id, int index_id,
- int partition_id, int64_t
version = -1,
- int num_rows = 100) {
+static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t
tablet_id,
+ int64_t index_id, int64_t
partition_id,
+ int64_t version = -1, int
num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(next_rowset_id());
@@ -392,14 +387,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
// mock rowset and tablet
int64_t tablet_id_base = 11414703;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
}
@@ -414,14 +409,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithoutDbIdTest) {
// mock rowset and tablet
int64_t tablet_id_base = 42411890;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
}
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_without_db_id(txn, tablet_id);
}
@@ -461,7 +456,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
// mock rowset and tablet
int64_t tablet_id_base = 87134121;
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
tmp_rowsets_meta;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
@@ -474,7 +469,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_without_db_id(txn, tablet_id);
}
@@ -488,7 +483,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
}
@@ -549,7 +544,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest)
{
// mock rowset and tablet
int64_t tablet_id_base = 1103;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2999; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
@@ -561,7 +556,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest)
{
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tmp_rowset_exist(txn, tablet_id, txn_id);
}
@@ -575,7 +570,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest)
{
req.set_txn_id(txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2999; ++i) {
int64_t tablet_id = tablet_id_base + i;
req.add_base_tablet_ids(tablet_id);
}
@@ -592,7 +587,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest)
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2999; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -630,7 +625,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest)
{
// mock rowset and tablet
int64_t tablet_id_base = 372323;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
@@ -642,7 +637,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest)
{
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tmp_rowset_exist(txn, tablet_id, txn_id);
}
@@ -669,7 +664,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest)
{
req.set_txn_id(txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
req.add_base_tablet_ids(tablet_id);
}
@@ -682,7 +677,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest)
{
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_exist(txn, tablet_id, txn_id);
@@ -739,7 +734,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
// mock rowset and tablet
int64_t tablet_id_base = 3131124;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2048; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
@@ -769,7 +764,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2048; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -1123,7 +1118,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 1908462;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -1152,7 +1147,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
ASSERT_GT(txn_id1, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset =
create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -1200,7 +1195,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
ASSERT_GT(txn_id2, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset =
create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -1243,7 +1238,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
std::string key = meta_tablet_idx_key({mock_instance, tablet_id});
std::string val;
@@ -1363,7 +1358,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 1908462;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 1999; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -1392,7 +1387,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
ASSERT_GT(txn_id1, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 1999; ++i) {
auto tmp_rowset =
create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -1440,7 +1435,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
ASSERT_GT(txn_id2, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 1999; ++i) {
auto tmp_rowset =
create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -1483,7 +1478,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 1999; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
@@ -1526,7 +1521,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
// mock rowset and tablet
int64_t tablet_id_base = 19201262;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 10001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -1551,7 +1546,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 10001; ++i) {
auto tmp_rowset =
create_rowset(tmp_txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
@@ -1661,7 +1656,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 10001; ++i) {
auto tmp_rowset =
create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -1788,7 +1783,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 213430076554;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -1813,7 +1808,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -1961,7 +1956,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 1908562;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -1990,7 +1985,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
ASSERT_GT(txn_id1, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset =
create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -2038,7 +2033,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
ASSERT_GT(txn_id2, 0);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset =
create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
@@ -2071,7 +2066,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset =
create_rowset(sub_txn_id2, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
@@ -2091,14 +2086,14 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
SubTxnInfo sub_txn_info1;
sub_txn_info1.set_sub_txn_id(sub_txn_id1);
sub_txn_info1.set_table_id(table_id);
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base +
i);
}
SubTxnInfo sub_txn_info2;
sub_txn_info2.set_sub_txn_id(sub_txn_id2);
sub_txn_info2.set_table_id(table_id);
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base +
i);
}
@@ -2130,7 +2125,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
@@ -2321,7 +2316,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 2313324;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -2346,7 +2341,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2440,7 +2435,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 2313324;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -2465,7 +2460,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2540,7 +2535,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 2313324;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -2565,7 +2560,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2666,7 +2661,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 2313324;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -2691,7 +2686,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2812,7 +2807,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
int64_t tablet_id_base = 2313324;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
}
@@ -2837,7 +2832,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
}
{
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 2001; ++i) {
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2938,7 +2933,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
// mock rowset and tablet
int64_t tablet_id_base = 3131124;
- for (int i = 0; i < 5; ++i) {
+ for (int i = 0; i < 2001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
@@ -2947,7 +2942,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
- for (int i = 5; i < 10; ++i) {
+ for (int i = 2001; i < 4002; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id2,
index_id2, partition_id2,
tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id2,
partition_id2);
@@ -2975,7 +2970,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 4002; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -2989,7 +2984,6 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
- config::txn_lazy_max_rowsets_per_batch = 1000;
auto txn_kv = get_fdb_txn_kv();
int64_t db_id = 14135425;
int64_t table_id = 31245456;
@@ -3037,7 +3031,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
// mock rowset and tablet
int64_t tablet_id_base = 3131124;
- for (int i = 0; i < 1000; ++i) {
+ for (int i = 0; i < 1001; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
@@ -3046,7 +3040,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
- for (int i = 1000; i < 2000; ++i) {
+ for (int i = 1001; i < 2002; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id2,
index_id2, partition_id2,
tablet_id_base + i);
auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i,
index_id2, partition_id2);
@@ -3074,7 +3068,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
- for (int i = 0; i < 2000; ++i) {
+ for (int i = 0; i < 2002; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -3085,7 +3079,172 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- config::txn_lazy_max_rowsets_per_batch = 2;
+}
+
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // thread1 lazy thread1
thread3
+ // | |
|
+ // commit_txn_eventually begin |
|
+ // | |
|
+ // lazy commit wait |
|
+ // | |
|
+ // | make_committed_txn_visible
|
+ // | |
|
+ // | inject TXN_TOO_OLD fdb error
|
+ // | | sc create
new tablet tmp rowset
+ // | |
|
+ // | |
|
+ // retry commit_txn |
|
+ // v v
+ auto txn_kv = get_mem_txn_kv();
+ int64_t db_id = 4534445675;
+ int64_t table_id = 4365676543;
+ int64_t index_id = 665453237;
+ int64_t partition_id = 2136776543678;
+
+ bool go = false;
+ std::mutex go_mutex;
+ std::condition_variable go_cv;
+ std::atomic<int32_t> make_committed_txn_visible_count = {0};
+ std::atomic<int32_t> sc_create_tmp_rowset_count = {0};
+ std::atomic<int32_t> sc_create_tmp_rowset_finish_count = {0};
+ std::atomic<int32_t> tmp_rowsets_been_already_converted = {0};
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit",
[&](auto&& args) {
+ LOG(INFO) << "zhangleiyyy";
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ if (make_committed_txn_visible_count == 0) {
+ make_committed_txn_visible_count++;
+ if (sc_create_tmp_rowset_count == 0) {
+ go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count
== 1; });
+ }
+ MetaServiceCode* code =
try_any_cast<MetaServiceCode*>(args[0]);
+ *code = MetaServiceCode::KV_TXN_CONFLICT;
+ bool* pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ LOG(INFO) << "inject kv error KV_TXN_CONFLICT";
+ go_cv.notify_all();
+ }
+ }
+ });
+
+ sp->set_call_back("convert_tmp_rowsets::already_been_converted",
[&](auto&& args) {
+ auto version_pb = *try_any_cast<VersionPB*>(args[0]);
+ LOG(INFO) << "version_pb:" << version_pb.ShortDebugString();
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ tmp_rowsets_been_already_converted++;
+ go_cv.notify_all();
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_sc_with_commit_txn_label");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res,
+ nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ int64_t txn_id = res.txn_id();
+
+ // mock rowset and tablet
+ int64_t tablet_id_base = 3131124;
+ for (int i = 0; i < 1001; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ std::thread thread1([&] {
+ {
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ go_cv.wait(_lock, [&] { return go; });
+ }
+
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ LOG(INFO) << "thread1 finish";
+ });
+
+ std::thread thread2([&] {
+ {
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ go_cv.wait(_lock, [&] { return go; });
+ }
+
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ sc_create_tmp_rowset_count++;
+ if (make_committed_txn_visible_count == 0) {
+ go_cv.wait(_lock, [&] { return
make_committed_txn_visible_count > 0; });
+ }
+ for (int i = 0; i < 1001; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id,
table_id, index_id,
+ partition_id, tablet_id_base + i);
+ auto tmp_rowset =
+ create_huge_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ LOG(INFO) << "sc_create_tmp_rowset_finish_count finish";
+ sc_create_tmp_rowset_finish_count++;
+ go_cv.notify_all();
+ }
+ LOG(INFO) << "thread2 finish";
+ }
+ });
+
+ std::unique_lock<std::mutex> go_lock(go_mutex);
+ go = true;
+ go_lock.unlock();
+ go_cv.notify_all();
+
+ thread1.join();
+ thread2.join();
+
+ ASSERT_GE(tmp_rowsets_been_already_converted, 1);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string mock_instance = "test_instance";
+ for (int i = 0; i < 1001; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tablet_idx_db_id(txn, db_id, tablet_id);
+ check_tmp_rowset_exist(txn, tablet_id, txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 2);
+ }
+ }
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]