bobhan1 commented on code in PR #55349:
URL: https://github.com/apache/doris/pull/55349#discussion_r2312839515


##########
cloud/test/txn_lazy_commit_test.cpp:
##########
@@ -3343,7 +3337,172 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithHugeRowsetMetaTest) {
     sp->clear_all_call_backs();
     sp->clear_trace();
     sp->disable_processing();
-    config::txn_lazy_max_rowsets_per_batch = 2;
+}
+
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //            thread1             lazy thread1                          
thread3
+    //               |                     |                                   
|
+    //  commit_txn_eventually begin        |                                   
|
+    //               |                     |                                   
|
+    //   lazy commit wait                  |                                   
|
+    //               |                     |                                   
|
+    //               |              make_committed_txn_visible                 
|
+    //               |                     |                                   
|
+    //               |              inject TXN_TOO_OLD fdb error               
|
+    //               |                     |                      sc create 
new tablet tmp rowset
+    //               |                     |                                   
|
+    //               |                     |                                   
|
+    //       retry commit_txn              |                                   
|
+    //               v                     v
+    auto txn_kv = get_mem_txn_kv();
+    int64_t db_id = 4534445675;
+    int64_t table_id = 4365676543;
+    int64_t index_id = 665453237;
+    int64_t partition_id = 2136776543678;
+
+    bool go = false;
+    std::mutex go_mutex;
+    std::condition_variable go_cv;
+    std::atomic<int32_t> make_committed_txn_visible_count = {0};
+    std::atomic<int32_t> sc_create_tmp_rowset_count = {0};
+    std::atomic<int32_t> sc_create_tmp_rowset_finish_count = {0};
+    std::atomic<int32_t> tmp_rowsets_been_already_converted = {0};
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", 
[&](auto&& args) {
+        LOG(INFO) << "zhangleiyyy";
+        {
+            std::unique_lock<std::mutex> _lock(go_mutex);
+            if (make_committed_txn_visible_count == 0) {
+                make_committed_txn_visible_count++;
+                if (sc_create_tmp_rowset_count == 0) {
+                    go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count 
== 1; });
+                }
+                MetaServiceCode* code = 
try_any_cast<MetaServiceCode*>(args[0]);
+                *code = MetaServiceCode::KV_TXN_CONFLICT;
+                bool* pred = try_any_cast<bool*>(args.back());
+                *pred = true;
+                LOG(INFO) << "inject kv error KV_TXN_CONFLICT";
+                go_cv.notify_all();
+            }
+        }
+    });
+
+    sp->set_call_back("convert_tmp_rowsets::already_been_converted", 
[&](auto&& args) {
+        auto version_pb = *try_any_cast<VersionPB*>(args[0]);
+        LOG(INFO) << "version_pb:" << version_pb.ShortDebugString();
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        tmp_rowsets_been_already_converted++;
+        go_cv.notify_all();
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    brpc::Controller cntl;
+    BeginTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.set_db_id(db_id);
+    txn_info_pb.set_label("test_sc_with_commit_txn_label");
+    txn_info_pb.add_table_ids(table_id);
+    txn_info_pb.set_timeout_ms(36000);
+    req.mutable_txn_info()->CopyFrom(txn_info_pb);
+    BeginTxnResponse res;
+    
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req, &res,
+                            nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    int64_t txn_id = res.txn_id();
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 3131124;
+    for (int i = 0; i < 1001; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+        auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    std::thread thread1([&] {
+        {
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                go_cv.wait(_lock, [&] { return go; });
+            }
+
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+        LOG(INFO) << "thread1 finish";
+    });
+
+    std::thread thread2([&] {
+        {
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                go_cv.wait(_lock, [&] { return go; });
+            }
+
+            {
+                std::unique_lock<std::mutex> _lock(go_mutex);
+                sc_create_tmp_rowset_count++;
+                if (make_committed_txn_visible_count == 0) {
+                    go_cv.wait(_lock, [&] { return 
make_committed_txn_visible_count > 0; });
+                }
+                for (int i = 0; i < 1001; ++i) {
+                    create_tablet_with_db_id(meta_service.get(), db_id, 
table_id, index_id,
+                                             partition_id, tablet_id_base + i);
+                    auto tmp_rowset =
+                            create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                    CreateRowsetResponse res;
+                    commit_rowset(meta_service.get(), tmp_rowset, res);
+                    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+                }
+                LOG(INFO) << "sc_create_tmp_rowset_finish_count finish";
+                sc_create_tmp_rowset_finish_count++;
+                go_cv.notify_all();
+            }
+            LOG(INFO) << "thread2 finish";
+        }
+    });
+
+    std::unique_lock<std::mutex> go_lock(go_mutex);
+    go = true;
+    go_lock.unlock();
+    go_cv.notify_all();
+
+    thread1.join();
+    thread2.join();
+
+    ASSERT_GE(tmp_rowsets_been_already_converted, 1);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string mock_instance = "test_instance";
+        for (int i = 0; i < 1001; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_tmp_rowset_exist(txn, tablet_id, txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);

Review Comment:
   should also check `check_rowset_meta_not_exist(txn, tablet_id, 3)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to