This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 76b1260a93f branch-3.1: [fix](txn lazy commit) fix txn lazy commit 
conflict with schema change #55349 (#55701)
76b1260a93f is described below

commit 76b1260a93f1eb60d8e72fb169d8b203e8b0c392
Author: Lei Zhang <[email protected]>
AuthorDate: Fri Sep 5 18:25:38 2025 +0800

    branch-3.1: [fix](txn lazy commit) fix txn lazy commit conflict with schema 
change #55349 (#55701)
    
    picked from #55349
---
 cloud/src/meta-service/txn_lazy_committer.cpp |  14 ++
 cloud/test/txn_lazy_commit_test.cpp           | 289 ++++++++++++++++++++------
 2 files changed, 238 insertions(+), 65 deletions(-)

diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 7a4755e44b9..1de69f59d08 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -121,6 +121,7 @@ void convert_tmp_rowsets(
                 LOG(WARNING) << msg;
                 return;
             }
+
             VersionPB version_pb;
             if (!version_pb.ParseFromString(ver_val)) {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -130,6 +131,16 @@ void convert_tmp_rowsets(
             }
             LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
                       << " version_pb:" << version_pb.ShortDebugString();
+
+            if (version_pb.pending_txn_ids_size() == 0 || 
version_pb.pending_txn_ids(0) != txn_id) {
+                LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << 
tmp_rowset_pb.partition_id()
+                          << " tmp_rowset_key=" << hex(tmp_rowset_key)
+                          << " version has already been converted."
+                          << " version_pb:" << version_pb.ShortDebugString();
+                
TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted",
+                                         &version_pb);
+                return;
+            }
             partition_versions.emplace(tmp_rowset_pb.partition_id(), 
version_pb);
             DCHECK_EQ(partition_versions.size(), 1) << 
partition_versions.size();
         }
@@ -279,6 +290,9 @@ void make_committed_txn_visible(const std::string& 
instance_id, int64_t db_id, i
         txn->put(recycle_key, recycle_val);
         LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" << 
txn_id;
 
+        
TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit",
+                                         &code);
+
         err = txn->commit();
         if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 5b1686eed28..ff747fd810e 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -67,11 +67,6 @@ int main(int argc, char** argv) {
     }
 
     config::enable_cloud_txn_lazy_commit = true;
-    config::txn_lazy_commit_rowsets_thresold = 2;
-    config::txn_lazy_max_rowsets_per_batch = 2;
-    config::txn_lazy_commit_num_threads = 2;
-    config::max_tablet_index_num_per_batch = 2;
-
     config::enable_txn_store_retry = false;
 
     config::label_keep_max_second = 0;
@@ -156,8 +151,8 @@ static void create_tablet_with_db_id(MetaServiceProxy* 
meta_service, int64_t db_
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
 }
 
-static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int index_id,
-                                              int partition_id, int64_t 
version = -1,
+static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int64_t index_id,
+                                              int64_t partition_id, int64_t 
version = -1,
                                               int num_rows = 100) {
     doris::RowsetMetaCloudPB rowset;
     rowset.set_rowset_id(0); // required
@@ -178,9 +173,9 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t 
txn_id, int64_t tablet_id,
     return rowset;
 }
 
-static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t 
tablet_id, int index_id,
-                                                   int partition_id, int64_t 
version = -1,
-                                                   int num_rows = 100) {
+static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t 
tablet_id,
+                                                   int64_t index_id, int64_t 
partition_id,
+                                                   int64_t version = -1, int 
num_rows = 100) {
     doris::RowsetMetaCloudPB rowset;
     rowset.set_rowset_id(0); // required
     rowset.set_rowset_id_v2(next_rowset_id());
@@ -392,14 +387,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
 
     // mock rowset and tablet
     int64_t tablet_id_base = 11414703;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
 
     std::unique_ptr<Transaction> txn;
     ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         int64_t tablet_id = tablet_id_base + i;
         check_tablet_idx_db_id(txn, db_id, tablet_id);
     }
@@ -414,14 +409,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithoutDbIdTest) {
 
     // mock rowset and tablet
     int64_t tablet_id_base = 42411890;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
     }
 
     std::unique_ptr<Transaction> txn;
     ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         int64_t tablet_id = tablet_id_base + i;
         check_tablet_idx_without_db_id(txn, tablet_id);
     }
@@ -461,7 +456,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
     // mock rowset and tablet
     int64_t tablet_id_base = 87134121;
     std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> 
tmp_rowsets_meta;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
@@ -474,7 +469,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_without_db_id(txn, tablet_id);
         }
@@ -488,7 +483,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
         }
@@ -549,7 +544,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) 
{
 
     // mock rowset and tablet
     int64_t tablet_id_base = 1103;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2999; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
@@ -561,7 +556,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) 
{
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tmp_rowset_exist(txn, tablet_id, txn_id);
         }
@@ -575,7 +570,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) 
{
         req.set_txn_id(txn_id);
         req.set_is_2pc(false);
         req.set_enable_txn_lazy_commit(true);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2999; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             req.add_base_tablet_ids(tablet_id);
         }
@@ -592,7 +587,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) 
{
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string mock_instance = "test_instance";
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2999; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
             check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -630,7 +625,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) 
{
 
     // mock rowset and tablet
     int64_t tablet_id_base = 372323;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
@@ -642,7 +637,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) 
{
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tmp_rowset_exist(txn, tablet_id, txn_id);
         }
@@ -669,7 +664,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) 
{
         req.set_txn_id(txn_id);
         req.set_is_2pc(false);
         req.set_enable_txn_lazy_commit(true);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             req.add_base_tablet_ids(tablet_id);
         }
@@ -682,7 +677,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) 
{
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
             check_tmp_rowset_exist(txn, tablet_id, txn_id);
@@ -739,7 +734,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
 
     // mock rowset and tablet
     int64_t tablet_id_base = 3131124;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2048; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
@@ -769,7 +764,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string mock_instance = "test_instance";
-        for (int i = 0; i < 5; ++i) {
+        for (int i = 0; i < 2048; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
             check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -1123,7 +1118,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 1908462;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -1152,7 +1147,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
             ASSERT_GT(txn_id1, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -1200,7 +1195,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
             ASSERT_GT(txn_id2, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -1243,7 +1238,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string mock_instance = "test_instance";
-        for (int i = 0; i < 10; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             std::string key = meta_tablet_idx_key({mock_instance, tablet_id});
             std::string val;
@@ -1363,7 +1358,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 1908462;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 1999; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -1392,7 +1387,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
             ASSERT_GT(txn_id1, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 1999; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -1440,7 +1435,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
             ASSERT_GT(txn_id2, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 1999; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -1483,7 +1478,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 10; ++i) {
+        for (int i = 0; i < 1999; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
 
@@ -1526,7 +1521,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
     // mock rowset and tablet
     int64_t tablet_id_base = 19201262;
 
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 10001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -1551,7 +1546,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 10001; ++i) {
                 auto tmp_rowset =
                         create_rowset(tmp_txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
@@ -1661,7 +1656,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 10001; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -1788,7 +1783,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase4Test) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 213430076554;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -1813,7 +1808,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase4Test) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -1961,7 +1956,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 1908562;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -1990,7 +1985,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
             ASSERT_GT(txn_id1, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -2038,7 +2033,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
             ASSERT_GT(txn_id2, 0);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset =
                         create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
@@ -2071,7 +2066,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
             ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
         }
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset =
                         create_rowset(sub_txn_id2, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
@@ -2091,14 +2086,14 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
             SubTxnInfo sub_txn_info1;
             sub_txn_info1.set_sub_txn_id(sub_txn_id1);
             sub_txn_info1.set_table_id(table_id);
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + 
i);
             }
 
             SubTxnInfo sub_txn_info2;
             sub_txn_info2.set_sub_txn_id(sub_txn_id2);
             sub_txn_info2.set_table_id(table_id);
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + 
i);
             }
 
@@ -2130,7 +2125,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase5Test) {
     {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
-        for (int i = 0; i < 10; ++i) {
+        for (int i = 0; i < 2001; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
 
@@ -2321,7 +2316,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 2313324;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -2346,7 +2341,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2440,7 +2435,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 2313324;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -2465,7 +2460,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2540,7 +2535,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 2313324;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -2565,7 +2560,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2666,7 +2661,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 2313324;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -2691,7 +2686,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2812,7 +2807,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
     auto meta_service = get_meta_service(txn_kv, true);
     // mock rowset and tablet
     int64_t tablet_id_base = 2313324;
-    for (int i = 0; i < 10; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
     }
@@ -2837,7 +2832,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
         }
 
         {
-            for (int i = 0; i < 10; ++i) {
+            for (int i = 0; i < 2001; ++i) {
                 auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -2938,7 +2933,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithMultiTableTest) {
 
     // mock rowset and tablet
     int64_t tablet_id_base = 3131124;
-    for (int i = 0; i < 5; ++i) {
+    for (int i = 0; i < 2001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
@@ -2947,7 +2942,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithMultiTableTest) {
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
 
-    for (int i = 5; i < 10; ++i) {
+    for (int i = 2001; i < 4002; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id2, 
index_id2, partition_id2,
                                  tablet_id_base + i);
         auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id2, 
partition_id2);
@@ -2975,7 +2970,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithMultiTableTest) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string mock_instance = "test_instance";
-        for (int i = 0; i < 10; ++i) {
+        for (int i = 0; i < 4002; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
             check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -2989,7 +2984,6 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithMultiTableTest) {
 }
 
 TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
-    config::txn_lazy_max_rowsets_per_batch = 1000;
     auto txn_kv = get_fdb_txn_kv();
     int64_t db_id = 14135425;
     int64_t table_id = 31245456;
@@ -3037,7 +3031,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithHugeRowsetMetaTest) {
 
     // mock rowset and tablet
     int64_t tablet_id_base = 3131124;
-    for (int i = 0; i < 1000; ++i) {
+    for (int i = 0; i < 1001; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
         auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
@@ -3046,7 +3040,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithHugeRowsetMetaTest) {
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
 
-    for (int i = 1000; i < 2000; ++i) {
+    for (int i = 1001; i < 2002; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id2, 
index_id2, partition_id2,
                                  tablet_id_base + i);
         auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id2, partition_id2);
@@ -3074,7 +3068,7 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithHugeRowsetMetaTest) {
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
         std::string mock_instance = "test_instance";
-        for (int i = 0; i < 2000; ++i) {
+        for (int i = 0; i < 2002; ++i) {
             int64_t tablet_id = tablet_id_base + i;
             check_tablet_idx_db_id(txn, db_id, tablet_id);
             check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
@@ -3085,7 +3079,172 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithHugeRowsetMetaTest) {
     sp->clear_all_call_backs();
     sp->clear_trace();
     sp->disable_processing();
-    config::txn_lazy_max_rowsets_per_batch = 2;
+}
+
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //            thread1             lazy thread1                          
thread3
+    //               |                     |                                   
|
+    //  commit_txn_eventually begin        |                                   
|
+    //               |                     |                                   
|
+    //   lazy commit wait                  |                                   
|
+    //               |                     |                                   
|
+    //               |              make_committed_txn_visible                 
|
+    //               |                     |                                   
|
+    //               |              inject TXN_TOO_OLD fdb error               
|
+    //               |                     |                      sc create 
new tablet tmp rowset
+    //               |                     |                                   
|
+    //               |                     |                                   
|
+    //       retry commit_txn              |                                   
|
+    //               v                     v
+    auto txn_kv = get_mem_txn_kv();
+    int64_t db_id = 4534445675;
+    int64_t table_id = 4365676543;
+    int64_t index_id = 665453237;
+    int64_t partition_id = 2136776543678;
+
+    bool go = false;
+    std::mutex go_mutex;
+    std::condition_variable go_cv;
+    std::atomic<int32_t> make_committed_txn_visible_count = {0};
+    std::atomic<int32_t> sc_create_tmp_rowset_count = {0};
+    std::atomic<int32_t> sc_create_tmp_rowset_finish_count = {0};
+    std::atomic<int32_t> tmp_rowsets_been_already_converted = {0};
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", 
[&](auto&& args) {
+        LOG(INFO) << "zhangleiyyy";
+        {
+            std::unique_lock<std::mutex> _lock(go_mutex);
+            if (make_committed_txn_visible_count == 0) {
+                make_committed_txn_visible_count++;
+                if (sc_create_tmp_rowset_count == 0) {
+                    go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count 
== 1; });
+                }
+                MetaServiceCode* code = 
try_any_cast<MetaServiceCode*>(args[0]);
+                *code = MetaServiceCode::KV_TXN_CONFLICT;
+                bool* pred = try_any_cast<bool*>(args.back());
+                *pred = true;
+                LOG(INFO) << "inject kv error KV_TXN_CONFLICT";
+                go_cv.notify_all();
+            }
+        }
+    });
+
+    sp->set_call_back("convert_tmp_rowsets::already_been_converted", 
[&](auto&& args) {
+        auto version_pb = *try_any_cast<VersionPB*>(args[0]);
+        LOG(INFO) << "version_pb:" << version_pb.ShortDebugString();
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        tmp_rowsets_been_already_converted++;
+        go_cv.notify_all();
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    brpc::Controller cntl;
+    BeginTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.set_db_id(db_id);
+    txn_info_pb.set_label("test_sc_with_commit_txn_label");
+    txn_info_pb.add_table_ids(table_id);
+    txn_info_pb.set_timeout_ms(36000);
+    req.mutable_txn_info()->CopyFrom(txn_info_pb);
+    BeginTxnResponse res;
+    
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req, &res,
+                            nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    int64_t txn_id = res.txn_id();
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 3131124;
+    for (int i = 0; i < 1001; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+        auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    std::thread thread1([&] {
+        {
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                go_cv.wait(_lock, [&] { return go; });
+            }
+
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+        LOG(INFO) << "thread1 finish";
+    });
+
+    std::thread thread2([&] {
+        {
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                go_cv.wait(_lock, [&] { return go; });
+            }
+
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                sc_create_tmp_rowset_count++;
+                if (make_committed_txn_visible_count == 0) {
+                    go_cv.wait(_lock, [&] { return 
make_committed_txn_visible_count > 0; });
+                }
+                for (int i = 0; i < 1001; ++i) {
+                    create_tablet_with_db_id(meta_service.get(), db_id, 
table_id, index_id,
+                                             partition_id, tablet_id_base + i);
+                    auto tmp_rowset =
+                            create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                    CreateRowsetResponse res;
+                    commit_rowset(meta_service.get(), tmp_rowset, res);
+                    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+                }
+                LOG(INFO) << "sc_create_tmp_rowset_finish_count finish";
+                sc_create_tmp_rowset_finish_count++;
+                go_cv.notify_all();
+            }
+            LOG(INFO) << "thread2 finish";
+        }
+    });
+
+    std::unique_lock<std::mutex> go_lock(go_mutex);
+    go = true;
+    go_lock.unlock();
+    go_cv.notify_all();
+
+    thread1.join();
+    thread2.join();
+
+    ASSERT_GE(tmp_rowsets_been_already_converted, 1);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string mock_instance = "test_instance";
+        for (int i = 0; i < 1001; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_tmp_rowset_exist(txn, tablet_id, txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+        }
+    }
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
 }
 
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to