This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 436d6c65d3b branch-4.1: [fix](ms) Add job check for stale commit 
rowset #61427 (#61660)
436d6c65d3b is described below

commit 436d6c65d3b7d9ca1ea7a924734f63e16f60a8b1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:42:29 2026 +0800

    branch-4.1: [fix](ms) Add job check for stale commit rowset #61427 (#61660)
    
    Cherry-picked from #61427
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 cloud/src/meta-service/meta_service.cpp         |   8 +-
 cloud/test/meta_service_job_test.cpp            | 143 ++++++++++++++++++++++++
 cloud/test/meta_service_operation_log_test.cpp  |   6 +-
 cloud/test/meta_service_test.cpp                |   4 +-
 cloud/test/meta_service_versioned_read_test.cpp |   4 +
 cloud/test/schema_kv_test.cpp                   |  27 ++++-
 6 files changed, 182 insertions(+), 10 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 23927ba99b3..903d77f8092 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2659,22 +2659,22 @@ int check_idempotent_for_txn_or_job(Transaction* txn, 
const std::string& recycle
             return -1;
         }
     } else if (!config::enable_recycle_delete_rowset_key_check) {
-        if (config::enable_tablet_job_check && tablet_job_id.empty() && 
!tablet_job_id.empty()) {
+        if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
             if (!check_job_existed(txn, code, msg, instance_id, tablet_id, 
rowset_id, tablet_job_id,
                                    is_versioned_read, resource_mgr)) {
                 return 1;
             }
         }
 
-        // Check if the prepare rowset request is invalid.
-        // If the transaction has been finished, it means this prepare rowset 
is a timeout retry request.
+        // Check if the commit rowset request is invalid.
+        // If the transaction has been finished, it means this commit rowset 
is a timeout retry request.
         // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
         // If the rowset had load id, it means it is a load request, otherwise 
it is a
         // compaction/sc request.
         if (config::enable_load_txn_status_check && rowset_meta.has_load_id() 
&&
             !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, 
instance_id,
                                       rowset_meta.txn_id(), code, msg)) {
-            LOG(WARNING) << "prepare rowset failed, txn_id=" << 
rowset_meta.txn_id()
+            LOG(WARNING) << "commit rowset failed, txn_id=" << 
rowset_meta.txn_id()
                          << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
                          << ", rowset_state=" << rowset_meta.rowset_state() << 
", msg=" << msg;
             return 1;
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 65527e6a869..ddba9b4c750 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1306,6 +1306,7 @@ TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) {
             auto tmp_rowset = create_rowset(tablet_id, tc.start_version, 
tc.end_version, 100);
             tmp_rowset.set_txn_id(txn_id);
             CreateRowsetResponse res;
+            prepare_rowset(meta_service.get(), tmp_rowset, res, txn_id);
             commit_rowset(meta_service.get(), tmp_rowset, res, txn_id);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
         }
@@ -1485,6 +1486,7 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         rowset.set_txn_id(txn_id + i);
         output_rowsets.push_back(rowset);
         CreateRowsetResponse res;
+        prepare_rowset(meta_service.get(), output_rowsets.back(), res, txn_id 
+ i);
         commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + 
i);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
@@ -6768,4 +6770,145 @@ TEST(MetaServiceJobTest, DeleteJobForRelatedRowsetTest) 
{
     }
 }
 
+// Test: Verify that check_idempotent_for_txn_or_job correctly calls 
check_job_existed
+// when enable_recycle_delete_rowset_key_check is false and tablet_job_id is 
non-empty.
+// This covers the bug fix where the condition was:
+//   tablet_job_id.empty() && !tablet_job_id.empty()  (always false, check 
never ran)
+// Fixed to:
+//   !tablet_job_id.empty()
+TEST(MetaServiceJobTest, CheckIdempotentWithTabletJobId) {
+    auto meta_service = get_meta_service();
+    auto* sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+        config::enable_recycle_delete_rowset_key_check = true;
+    };
+    // Disable recycle_delete_rowset_key_check so we enter the else-if branch
+    // in check_idempotent_for_txn_or_job where the bug existed.
+    config::enable_recycle_delete_rowset_key_check = false;
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    int64_t table_id = 13440;
+    int64_t index_id = 13450;
+    int64_t partition_id = 13460;
+    int64_t tablet_id = 13470;
+
+    create_tablet(meta_service.get(), table_id, index_id, partition_id, 
tablet_id);
+
+    std::string job_id = "test_check_idempotent_job";
+
+    // Step 1: Create input rowsets for compaction
+    {
+        std::vector<doris::RowsetMetaCloudPB> input_rowsets;
+        input_rowsets.push_back(create_rowset(tablet_id, 2, 2, 100));
+        input_rowsets.push_back(create_rowset(tablet_id, 3, 3, 100));
+        input_rowsets[0].set_resource_id(std::string(RESOURCE_ID));
+        input_rowsets[1].set_resource_id(std::string(RESOURCE_ID));
+        insert_rowsets(meta_service->txn_kv().get(), table_id, index_id, 
partition_id, tablet_id,
+                       input_rowsets);
+    }
+
+    // Step 2: Start a compaction job
+    {
+        StartTabletJobResponse res;
+        start_compaction_job(meta_service.get(), tablet_id, job_id, 
"test_initiator", 0, 0,
+                             TabletCompactionJobPB::CUMULATIVE, res, {2, 3});
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // Step 3: Prepare rowset with tablet_job_id
+    doris::RowsetMetaCloudPB rowset_meta;
+    {
+        rowset_meta = create_rowset(tablet_id, 4, 4, 200);
+        rowset_meta.set_job_id(job_id);
+        rowset_meta.set_resource_id(std::string(RESOURCE_ID));
+
+        brpc::Controller cntl;
+        CreateRowsetResponse res;
+        auto* arena = res.GetArena();
+        auto* req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset_meta);
+        req->set_tablet_job_id(job_id);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // Step 4: commit_rowset with tablet_job_id while job still exists - 
should succeed
+    {
+        brpc::Controller cntl;
+        CreateRowsetResponse res;
+        auto* arena = res.GetArena();
+        auto* req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset_meta);
+        req->set_tablet_job_id(job_id);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
+                << "commit_rowset should succeed when job exists, msg=" << 
res.status().msg();
+    }
+
+    // Step 5: Abort the compaction job (removes the job entry from 
TabletJobInfoPB)
+    {
+        FinishTabletJobResponse res;
+        finish_compaction_job(meta_service.get(), tablet_id, job_id, 
"test_initiator", 0, 0,
+                              TabletCompactionJobPB::CUMULATIVE, res, 
FinishTabletJobRequest::ABORT,
+                              {2, 3});
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // Step 6: Prepare a new rowset (without tablet_job_id to bypass prepare 
check)
+    doris::RowsetMetaCloudPB rowset_meta2;
+    {
+        rowset_meta2 = create_rowset(tablet_id, 5, 5, 200);
+        rowset_meta2.set_job_id(job_id);
+        rowset_meta2.set_resource_id(std::string(RESOURCE_ID));
+
+        brpc::Controller cntl;
+        CreateRowsetResponse res;
+        auto* arena = res.GetArena();
+        auto* req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // Step 7: commit_rowset with tablet_job_id after job aborted - should 
fail.
+    // Before the fix, this would incorrectly succeed because 
check_job_existed was
+    // never called (the condition was always false).
+    {
+        brpc::Controller cntl;
+        CreateRowsetResponse res;
+        auto* arena = res.GetArena();
+        auto* req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+        req->set_tablet_job_id(job_id);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+        ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
+                << "commit_rowset should fail with STALE_PREPARE_ROWSET when 
job is aborted, msg="
+                << res.status().msg();
+    }
+
+    // Step 8: commit_rowset without tablet_job_id - should succeed (skips job 
check)
+    {
+        brpc::Controller cntl;
+        CreateRowsetResponse res;
+        auto* arena = res.GetArena();
+        auto* req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+        // Do NOT set tablet_job_id - the check should be skipped
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
+                << "commit_rowset without tablet_job_id should succeed, msg=" 
<< res.status().msg();
+    }
+}
+
 } // namespace doris::cloud
diff --git a/cloud/test/meta_service_operation_log_test.cpp 
b/cloud/test/meta_service_operation_log_test.cpp
index 5a5c46a7603..ee687cf532e 100644
--- a/cloud/test/meta_service_operation_log_test.cpp
+++ b/cloud/test/meta_service_operation_log_test.cpp
@@ -47,6 +47,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, 
int64_t table_id, int6
                           int64_t partition_id, int64_t tablet_id);
 extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int partition_id,
                                               int64_t version, int num_rows);
+extern void prepare_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
+                           CreateRowsetResponse& res);
 extern void commit_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
                           CreateRowsetResponse& res);
 extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t 
index_id,
@@ -841,7 +843,7 @@ TEST(MetaServiceOperationLogTest, CommitTxn) {
         LOG(INFO) << "Creating rowset for tablet_id=" << (tablet_id_base + i)
                   << ", partition_id=" << partition_id << ", txn_id=" << txn_id
                   << ", rowset=" << tmp_rowset.ShortDebugString();
-
+        prepare_rowset(meta_service.get(), tmp_rowset, res);
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
@@ -1048,6 +1050,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) {
     create_tablet(meta_service.get(), table_id, 1237, partition_id, 
tablet_id_base);
     auto tmp_rowset = create_rowset(txn_id, tablet_id_base, partition_id, 1, 
100);
     CreateRowsetResponse res;
+    prepare_rowset(meta_service.get(), tmp_rowset, res);
     commit_rowset(meta_service.get(), tmp_rowset, res);
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
 
@@ -1273,6 +1276,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
         create_tablet(meta_service.get(), table_id, 1238, partition_id, 
tablet_id_base + i);
         auto tmp_rowset = create_rowset(sub_txn_id, tablet_id_base + i, 
partition_id, 1, 100);
         CreateRowsetResponse res;
+        prepare_rowset(meta_service.get(), tmp_rowset, res);
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 13fd209f54b..a87e71f2efb 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -253,8 +253,8 @@ doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, 
int64_t tablet_id, int pa
     return rowset;
 }
 
-static void prepare_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
-                           CreateRowsetResponse& res) {
+void prepare_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
+                    CreateRowsetResponse& res) {
     brpc::Controller cntl;
     auto arena = res.GetArena();
     auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
diff --git a/cloud/test/meta_service_versioned_read_test.cpp 
b/cloud/test/meta_service_versioned_read_test.cpp
index 58793011746..7ba77a1d3aa 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -46,6 +46,7 @@
 #include "meta-store/versioned_value.h"
 #include "mock_resource_manager.h"
 #include "rate-limiter/rate_limiter.h"
+#include "recycler/util.h"
 #include "resource-manager/resource_manager.h"
 
 namespace doris::cloud {
@@ -57,6 +58,8 @@ extern void create_tablet(MetaServiceProxy* meta_service, 
int64_t table_id, int6
                           int64_t partition_id, int64_t tablet_id);
 extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int partition_id,
                                               int64_t version, int num_rows);
+extern void prepare_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
+                           CreateRowsetResponse& res);
 extern void commit_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
                           CreateRowsetResponse& res);
 extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const 
std::string& label,
@@ -205,6 +208,7 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) {
             create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id_base + i);
             auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id, -1, 100);
             CreateRowsetResponse res;
+            prepare_rowset(meta_service.get(), tmp_rowset, res);
             commit_rowset(meta_service.get(), tmp_rowset, res);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
         }
diff --git a/cloud/test/schema_kv_test.cpp b/cloud/test/schema_kv_test.cpp
index 54733a29a65..32a9302a015 100644
--- a/cloud/test/schema_kv_test.cpp
+++ b/cloud/test/schema_kv_test.cpp
@@ -651,6 +651,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
     auto sp = SyncPoint::get_instance();
     DORIS_CLOUD_DEFER {
         SyncPoint::get_instance()->clear_all_call_backs();
+        config::enable_recycle_delete_rowset_key_check = true;
     };
     sp->set_call_back("get_instance_id", [&](auto&& args) {
         auto* ret = try_any_cast_ret<std::string>(args);
@@ -658,6 +659,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
         ret->second = true;
     });
     sp->enable_processing();
+    config::enable_recycle_delete_rowset_key_check = false;
 
     int64_t db_id = 1000;
 
@@ -667,17 +669,36 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
         config::write_schema_kv = false;
         ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id, 
table_id, index_id,
                                               partition_id, tablet_id, 
next_rowset_id(), 1));
+
+        int64_t txn_id = -1;
+        std::string label = "test_abort_txn_label";
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            BeginTxnResponse res;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            auto* txn_info = req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label(label);
+            txn_info->add_table_ids(table_id);
+            txn_info->set_timeout_ms(36000);
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            LOG(INFO) << "Step 1: Transaction started, txn_id=" << txn_id;
+        }
         std::unique_ptr<Transaction> txn;
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
-        auto committed_rowset = create_rowset(10005, tablet_id, 
next_rowset_id(), 2, 2);
+        auto committed_rowset = create_rowset(txn_id, tablet_id, 
next_rowset_id(), 2, 2);
         std::string tmp_rowset_key, tmp_rowset_val;
         // 0:instance_id  1:txn_id  2:tablet_id
-        meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key);
+        meta_rowset_tmp_key({instance_id, txn_id, tablet_id}, &tmp_rowset_key);
         ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val));
         txn->put(tmp_rowset_key, tmp_rowset_val);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
         CreateRowsetResponse res;
-        auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2, 
2);
+        auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(), 
2, 2);
         prepare_rowset(meta_service.get(), new_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
         ASSERT_TRUE(res.has_existed_rowset_meta());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to