This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5cdc082be83 [fix](cloud) potential data race when retrying
prepare/commit rowset for load (#51129)
5cdc082be83 is described below
commit 5cdc082be8399eb5b90847821f40c23ba88fe05e
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 4 16:33:32 2025 +0800
[fix](cloud) potential data race when retrying prepare/commit rowset for
load (#51129)
If the transaction has been finished, it means the prepare/commit rowset
is a timeout retry request. In this case, do not write the recycle key
again, otherwise it may cause data loss.
**Notice**:
There is a compatibility issue with multi-statement transaction write
operations during upgrades that could cause write operations to fail
either before or during the meta service upgrade process.
---
cloud/src/common/config.h | 3 +
cloud/src/meta-service/meta_service.cpp | 97 ++++++++++++++++++++++++++++-
cloud/src/meta-service/meta_service_txn.cpp | 1 +
cloud/test/meta_service_test.cpp | 67 ++++++++++++++++++++
gensrc/proto/cloud.proto | 1 +
5 files changed, 168 insertions(+), 1 deletion(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index fd8d67c9373..6bc2d73f700 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -224,6 +224,9 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");
CONF_mBool(enable_distinguish_hdfs_path, "true");
+// If enabled, the txn status will be checked when preapre/commit rowset
+CONF_mBool(enable_load_txn_status_check, "true");
+
// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index c4f3784f9f5..f2baa7fd5a9 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -990,6 +990,73 @@ static void fill_schema_from_dict(MetaServiceCode& code,
std::string& msg,
existed_rowset_meta->CopyFrom(metas.Get(0));
}
+/**
+* Check if the transaction status is as expected.
+* If the transaction is not in the expected state, return false and set the
error code and message.
+*
+* @param expect_status The expected transaction status.
+* @param txn Pointer to the transaction object.
+* @param instance_id The instance ID associated with the transaction.
+* @param txn_id The transaction ID to check.
+* @param code Reference to the error code to be set in case of failure.
+* @param msg Reference to the error message to be set in case of failure.
+* @return true if the transaction status matches the expected status, false
otherwise.
+ */
+static bool check_transaction_status(TxnStatusPB expect_status, Transaction*
txn,
+ const std::string& instance_id, int64_t
txn_id,
+ MetaServiceCode& code, std::string& msg) {
+ // Get db id with txn id
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ TxnErrorCode err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id,
err);
+ return false;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+ return false;
+ }
+
+ DCHECK(index_pb.has_tablet_index() == true);
+ DCHECK(index_pb.tablet_index().has_db_id() == true);
+ if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+ LOG(WARNING) << fmt::format(
+ "txn_index_pb is malformed, tablet_index has no db_id,
txn_id={}", txn_id);
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id);
+ return false;
+ }
+ auto db_id = index_pb.tablet_index().db_id();
+ txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id;
+
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err);
+ return false;
+ }
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}",
db_id, txn_id);
+ return false;
+ }
+ if (txn_info.status() != expect_status) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}",
expect_status, txn_id,
+ txn_info.status());
+ return false;
+ }
+ return true;
+}
+
/**
* 1. Check and confirm tmp rowset kv does not exist
* 2. Construct recycle rowset kv which contains object path
@@ -1032,6 +1099,20 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
return;
}
+ // Check if the prepare rowset request is invalid.
+ // If the transaction has been finished, it means this prepare rowset is a
timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset had load id, it means it is a load request, otherwise it
is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(),
instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "prepare rowset failed, txn_id=" <<
rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() << ",
msg=" << msg;
+ return;
+ }
+
// Check if commit key already exists.
std::string val;
err = txn->get(tmp_rs_key, &val);
@@ -1155,6 +1236,20 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}
+ // Check if the commit rowset request is invalid.
+ // If the transaction has been finished, it means this commit rowset is a
timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset has load id, it means it is a load request, otherwise it
is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(),
instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() << ",
msg=" << msg;
+ return;
+ }
+
// Check if commit key already exists.
std::string existed_commit_val;
err = txn->get(tmp_rs_key, &existed_commit_val);
@@ -3500,4 +3595,4 @@ void
MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
response->mutable_schema_dict()->Swap(&schema_dict);
}
-} // namespace doris::cloud
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 91e78337f1b..04d12657ed5 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3167,6 +3167,7 @@ void
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
+ index_pb.set_parent_txn_id(txn_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 48bf2a721f0..c0acad685ce 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -9482,4 +9482,71 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
}
+
+TEST(MetaServiceTest, StalePrepareRowset) {
+ auto meta_service = get_meta_service();
+
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_prepare_rowset";
+ create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_TRUE(res.status().msg().find("rowset already exists") !=
std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) <<
res.status().code();
+
+ commit_txn(meta_service.get(), db_id, txn_id, label);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+}
+
+TEST(MetaServiceTest, StaleCommitRowset) {
+ auto meta_service = get_meta_service();
+
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_prepare_rowset";
+ create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ commit_txn(meta_service.get(), db_id, txn_id, label);
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+}
+
} // namespace doris::cloud
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index b9778f97c0f..3aefcea127d 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -385,6 +385,7 @@ message TxnLabelPB {
// txn_id -> db_id
message TxnIndexPB {
optional TabletIndexPB tablet_index = 1;
+ optional int64 parent_txn_id = 2;
}
message TxnInfoPB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]