This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9ec0ca8972d branch-4.0: [fix](ms) Add job check for stale commit
rowset #61427 (#61658)
9ec0ca8972d is described below
commit 9ec0ca8972d24dd99c79b6e3aa33206273f1f9d5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 17:46:00 2026 +0800
branch-4.0: [fix](ms) Add job check for stale commit rowset #61427 (#61658)
Cherry-picked from #61427
Co-authored-by: Yixuan Wang <[email protected]>
---
cloud/src/meta-service/meta_service.cpp | 8 +-
cloud/test/meta_service_job_test.cpp | 143 ++++++++++++++++++++++++
cloud/test/meta_service_operation_log_test.cpp | 6 +-
cloud/test/meta_service_test.cpp | 4 +-
cloud/test/meta_service_versioned_read_test.cpp | 4 +
cloud/test/schema_kv_test.cpp | 27 ++++-
6 files changed, 182 insertions(+), 10 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 90ebd4076e1..cc2190d503d 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2656,22 +2656,22 @@ int check_idempotent_for_txn_or_job(Transaction* txn,
const std::string& recycle
return -1;
}
} else if (!config::enable_recycle_delete_rowset_key_check) {
- if (config::enable_tablet_job_check && tablet_job_id.empty() &&
!tablet_job_id.empty()) {
+ if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
if (!check_job_existed(txn, code, msg, instance_id, tablet_id,
rowset_id, tablet_job_id,
is_versioned_read, resource_mgr)) {
return 1;
}
}
- // Check if the prepare rowset request is invalid.
- // If the transaction has been finished, it means this prepare rowset
is a timeout retry request.
+ // Check if the commit rowset request is invalid.
+ // If the transaction has been finished, it means this commit rowset
is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may
cause data loss.
// If the rowset had load id, it means it is a load request, otherwise
it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id()
&&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn,
instance_id,
rowset_meta.txn_id(), code, msg)) {
- LOG(WARNING) << "prepare rowset failed, txn_id=" <<
rowset_meta.txn_id()
+ LOG(WARNING) << "commit rowset failed, txn_id=" <<
rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() <<
", msg=" << msg;
return 1;
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index 65527e6a869..ddba9b4c750 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1306,6 +1306,7 @@ TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) {
auto tmp_rowset = create_rowset(tablet_id, tc.start_version,
tc.end_version, 100);
tmp_rowset.set_txn_id(txn_id);
CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), tmp_rowset, res, txn_id);
commit_rowset(meta_service.get(), tmp_rowset, res, txn_id);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
@@ -1485,6 +1486,7 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
rowset.set_txn_id(txn_id + i);
output_rowsets.push_back(rowset);
CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), output_rowsets.back(), res, txn_id
+ i);
commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id +
i);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
@@ -6768,4 +6770,145 @@ TEST(MetaServiceJobTest, DeleteJobForRelatedRowsetTest)
{
}
}
+// Test: Verify that check_idempotent_for_txn_or_job correctly calls
check_job_existed
+// when enable_recycle_delete_rowset_key_check is false and tablet_job_id is
non-empty.
+// This covers the bug fix where the condition was:
+// tablet_job_id.empty() && !tablet_job_id.empty() (always false, check
never ran)
+// Fixed to:
+// !tablet_job_id.empty()
+TEST(MetaServiceJobTest, CheckIdempotentWithTabletJobId) {
+ auto meta_service = get_meta_service();
+ auto* sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ config::enable_recycle_delete_rowset_key_check = true;
+ };
+ // Disable recycle_delete_rowset_key_check so we enter the else-if branch
+ // in check_idempotent_for_txn_or_job where the bug existed.
+ config::enable_recycle_delete_rowset_key_check = false;
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ int64_t table_id = 13440;
+ int64_t index_id = 13450;
+ int64_t partition_id = 13460;
+ int64_t tablet_id = 13470;
+
+ create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id);
+
+ std::string job_id = "test_check_idempotent_job";
+
+ // Step 1: Create input rowsets for compaction
+ {
+ std::vector<doris::RowsetMetaCloudPB> input_rowsets;
+ input_rowsets.push_back(create_rowset(tablet_id, 2, 2, 100));
+ input_rowsets.push_back(create_rowset(tablet_id, 3, 3, 100));
+ input_rowsets[0].set_resource_id(std::string(RESOURCE_ID));
+ input_rowsets[1].set_resource_id(std::string(RESOURCE_ID));
+ insert_rowsets(meta_service->txn_kv().get(), table_id, index_id,
partition_id, tablet_id,
+ input_rowsets);
+ }
+
+ // Step 2: Start a compaction job
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, job_id,
"test_initiator", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res, {2, 3});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // Step 3: Prepare rowset with tablet_job_id
+ doris::RowsetMetaCloudPB rowset_meta;
+ {
+ rowset_meta = create_rowset(tablet_id, 4, 4, 200);
+ rowset_meta.set_job_id(job_id);
+ rowset_meta.set_resource_id(std::string(RESOURCE_ID));
+
+ brpc::Controller cntl;
+ CreateRowsetResponse res;
+ auto* arena = res.GetArena();
+ auto* req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset_meta);
+ req->set_tablet_job_id(job_id);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // Step 4: commit_rowset with tablet_job_id while job still exists -
should succeed
+ {
+ brpc::Controller cntl;
+ CreateRowsetResponse res;
+ auto* arena = res.GetArena();
+ auto* req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset_meta);
+ req->set_tablet_job_id(job_id);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
+ << "commit_rowset should succeed when job exists, msg=" <<
res.status().msg();
+ }
+
+ // Step 5: Abort the compaction job (removes the job entry from
TabletJobInfoPB)
+ {
+ FinishTabletJobResponse res;
+ finish_compaction_job(meta_service.get(), tablet_id, job_id,
"test_initiator", 0, 0,
+ TabletCompactionJobPB::CUMULATIVE, res,
FinishTabletJobRequest::ABORT,
+ {2, 3});
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // Step 6: Prepare a new rowset (without tablet_job_id to bypass prepare
check)
+ doris::RowsetMetaCloudPB rowset_meta2;
+ {
+ rowset_meta2 = create_rowset(tablet_id, 5, 5, 200);
+ rowset_meta2.set_job_id(job_id);
+ rowset_meta2.set_resource_id(std::string(RESOURCE_ID));
+
+ brpc::Controller cntl;
+ CreateRowsetResponse res;
+ auto* arena = res.GetArena();
+ auto* req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // Step 7: commit_rowset with tablet_job_id after job aborted - should
fail.
+ // Before the fix, this would incorrectly succeed because
check_job_existed was
+ // never called (the condition was always false).
+ {
+ brpc::Controller cntl;
+ CreateRowsetResponse res;
+ auto* arena = res.GetArena();
+ auto* req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+ req->set_tablet_job_id(job_id);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
+ << "commit_rowset should fail with STALE_PREPARE_ROWSET when
job is aborted, msg="
+ << res.status().msg();
+ }
+
+ // Step 8: commit_rowset without tablet_job_id - should succeed (skips job
check)
+ {
+ brpc::Controller cntl;
+ CreateRowsetResponse res;
+ auto* arena = res.GetArena();
+ auto* req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset_meta2);
+ // Do NOT set tablet_job_id - the check should be skipped
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
+ << "commit_rowset without tablet_job_id should succeed, msg="
<< res.status().msg();
+ }
+}
+
} // namespace doris::cloud
diff --git a/cloud/test/meta_service_operation_log_test.cpp
b/cloud/test/meta_service_operation_log_test.cpp
index 5a5c46a7603..ee687cf532e 100644
--- a/cloud/test/meta_service_operation_log_test.cpp
+++ b/cloud/test/meta_service_operation_log_test.cpp
@@ -47,6 +47,8 @@ extern void create_tablet(MetaServiceProxy* meta_service,
int64_t table_id, int6
int64_t partition_id, int64_t tablet_id);
extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int partition_id,
int64_t version, int num_rows);
+extern void prepare_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
+ CreateRowsetResponse& res);
extern void commit_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t
index_id,
@@ -841,7 +843,7 @@ TEST(MetaServiceOperationLogTest, CommitTxn) {
LOG(INFO) << "Creating rowset for tablet_id=" << (tablet_id_base + i)
<< ", partition_id=" << partition_id << ", txn_id=" << txn_id
<< ", rowset=" << tmp_rowset.ShortDebugString();
-
+ prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
@@ -1048,6 +1050,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnEventually) {
create_tablet(meta_service.get(), table_id, 1237, partition_id,
tablet_id_base);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base, partition_id, 1,
100);
CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1273,6 +1276,7 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
create_tablet(meta_service.get(), table_id, 1238, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_rowset(sub_txn_id, tablet_id_base + i,
partition_id, 1, 100);
CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 13fd209f54b..a87e71f2efb 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -253,8 +253,8 @@ doris::RowsetMetaCloudPB create_rowset(int64_t txn_id,
int64_t tablet_id, int pa
return rowset;
}
-static void prepare_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
- CreateRowsetResponse& res) {
+void prepare_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
+ CreateRowsetResponse& res) {
brpc::Controller cntl;
auto arena = res.GetArena();
auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
diff --git a/cloud/test/meta_service_versioned_read_test.cpp
b/cloud/test/meta_service_versioned_read_test.cpp
index 58793011746..7ba77a1d3aa 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -46,6 +46,7 @@
#include "meta-store/versioned_value.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
+#include "recycler/util.h"
#include "resource-manager/resource_manager.h"
namespace doris::cloud {
@@ -57,6 +58,8 @@ extern void create_tablet(MetaServiceProxy* meta_service,
int64_t table_id, int6
int64_t partition_id, int64_t tablet_id);
extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int partition_id,
int64_t version, int num_rows);
+extern void prepare_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
+ CreateRowsetResponse& res);
extern void commit_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res);
extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
@@ -205,6 +208,7 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) {
create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id, -1, 100);
CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), tmp_rowset, res);
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
diff --git a/cloud/test/schema_kv_test.cpp b/cloud/test/schema_kv_test.cpp
index 54733a29a65..32a9302a015 100644
--- a/cloud/test/schema_kv_test.cpp
+++ b/cloud/test/schema_kv_test.cpp
@@ -651,6 +651,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
+ config::enable_recycle_delete_rowset_key_check = true;
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
@@ -658,6 +659,7 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
ret->second = true;
});
sp->enable_processing();
+ config::enable_recycle_delete_rowset_key_check = false;
int64_t db_id = 1000;
@@ -667,17 +669,36 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
config::write_schema_kv = false;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), db_id,
table_id, index_id,
partition_id, tablet_id,
next_rowset_id(), 1));
+
+ int64_t txn_id = -1;
+ std::string label = "test_abort_txn_label";
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ BeginTxnResponse res;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ auto* txn_info = req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label(label);
+ txn_info->add_table_ids(table_id);
+ txn_info->set_timeout_ms(36000);
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ LOG(INFO) << "Step 1: Transaction started, txn_id=" << txn_id;
+ }
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
- auto committed_rowset = create_rowset(10005, tablet_id,
next_rowset_id(), 2, 2);
+ auto committed_rowset = create_rowset(txn_id, tablet_id,
next_rowset_id(), 2, 2);
std::string tmp_rowset_key, tmp_rowset_val;
// 0:instance_id 1:txn_id 2:tablet_id
- meta_rowset_tmp_key({instance_id, 10005, tablet_id}, &tmp_rowset_key);
+ meta_rowset_tmp_key({instance_id, txn_id, tablet_id}, &tmp_rowset_key);
ASSERT_TRUE(committed_rowset.SerializeToString(&tmp_rowset_val));
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
CreateRowsetResponse res;
- auto new_rowset = create_rowset(10005, tablet_id, next_rowset_id(), 2,
2);
+ auto new_rowset = create_rowset(txn_id, tablet_id, next_rowset_id(),
2, 2);
prepare_rowset(meta_service.get(), new_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED);
ASSERT_TRUE(res.has_existed_rowset_meta());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]