This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 98f17db3c5d [feat](cloud) commit_txn save the min read version (#55059)
98f17db3c5d is described below
commit 98f17db3c5ddf8ed2529ae96ed54a4310896f671
Author: walter <[email protected]>
AuthorDate: Mon Aug 25 15:44:54 2025 +0800
[feat](cloud) commit_txn save the min read version (#55059)
---
.../src/meta-service/meta_service_tablet_stats.cpp | 9 ++--
cloud/src/meta-service/meta_service_tablet_stats.h | 4 +-
cloud/src/meta-service/meta_service_txn.cpp | 57 +++++++++++++++++++---
cloud/src/meta-service/txn_lazy_committer.cpp | 4 +-
4 files changed, 60 insertions(+), 14 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.cpp
b/cloud/src/meta-service/meta_service_tablet_stats.cpp
index a32d45e433f..f23f1cf9f9c 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.cpp
+++ b/cloud/src/meta-service/meta_service_tablet_stats.cpp
@@ -179,16 +179,17 @@ void internal_get_tablet_stats(MetaServiceCode& code,
std::string& msg, Transact
}
}
-void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string&
msg, Transaction* txn,
+void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string&
msg,
+ MetaReader& meta_reader, Transaction*
txn,
const std::string& instance_id,
const TabletIndexPB& tablet_idx,
TabletStatsPB& stats,
bool snapshot) {
int64_t tablet_id = tablet_idx.tablet_id();
- std::string stats_key = versioned::tablet_load_stats_key({instance_id,
tablet_id});
+ Versionstamp versionstamp;
// Try to read existing versioned tablet stats
- Versionstamp versionstamp;
- TxnErrorCode err = versioned::document_get(txn, stats_key, &stats,
&versionstamp, snapshot);
+ TxnErrorCode err =
+ meta_reader.get_tablet_load_stats(tablet_id, &stats,
&versionstamp, snapshot);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
// If versioned stats doesn't exist, read from single version
internal_get_tablet_stats(code, msg, txn, instance_id, tablet_idx,
stats, snapshot);
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.h
b/cloud/src/meta-service/meta_service_tablet_stats.h
index c4decf70edc..9624832cd16 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.h
+++ b/cloud/src/meta-service/meta_service_tablet_stats.h
@@ -19,6 +19,7 @@
#include <gen_cpp/cloud.pb.h>
+#include "meta-store/meta_reader.h"
#include "resource-manager/resource_manager.h"
namespace doris::cloud {
@@ -54,7 +55,8 @@ void internal_get_tablet_stats(MetaServiceCode& code,
std::string& msg, Transact
// Get versioned tablet stats via `txn`. If an error occurs, `code` will be
set to non OK.
// NOTE: this function returns original `TabletStatsPB` and detached tablet
stats val stored in kv store,
// MUST call `merge_tablet_stats(stats, detached_stats)` to get the real
tablet stats.
-void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string&
msg, Transaction* txn,
+void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string&
msg,
+ MetaReader& meta_reader, Transaction*
txn,
const std::string& instance_id, const
TabletIndexPB& idx,
TabletStatsPB& stats, bool snapshot =
false);
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index c0228c95315..34b9846891c 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1147,8 +1147,6 @@ void MetaServiceImpl::commit_txn_immediately(
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
- MetaReader meta_reader(instance_id, txn_kv_.get());
-
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
int64_t last_pending_txn_id = 0;
@@ -1237,6 +1235,8 @@ void MetaServiceImpl::commit_txn_immediately(
LOG(INFO) << "txn_id=" << txn_id << " txn_info=" <<
txn_info.ShortDebugString();
+ MetaReader meta_reader(instance_id, txn_kv_.get());
+
// Prepare rowset meta and new_versions
AnnotateTag txn_tag("txn_id", txn_id);
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
@@ -1441,6 +1441,17 @@ void MetaServiceImpl::commit_txn_immediately(
// Save table versions
for (auto& i : table_id_tablet_ids) {
+ if (is_versioned_read) {
+ // Read the table version, to build the operation log visible
version range.
+ err = meta_reader.get_table_version(txn.get(), i.first,
nullptr, true);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ }
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
@@ -1490,8 +1501,8 @@ void MetaServiceImpl::commit_txn_immediately(
if (is_versioned_write) {
TabletStatsPB stats_pb;
- internal_get_versioned_tablet_stats(code, msg, txn.get(),
instance_id, tablet_idx,
- stats_pb);
+ internal_get_versioned_tablet_stats(code, msg, meta_reader,
txn.get(), instance_id,
+ tablet_idx, stats_pb);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update versioned tablet stats failed,
code=" << code
<< " msg=" << msg << " txn_id=" << txn_id
@@ -1530,6 +1541,9 @@ void MetaServiceImpl::commit_txn_immediately(
commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
std::string log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
+ if (is_versioned_read) {
+
operation_log.set_min_timestamp(meta_reader.min_read_version());
+ }
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
std::string operation_log_value;
if (!operation_log.SerializeToString(&operation_log_value)) {
@@ -1725,7 +1739,6 @@ void MetaServiceImpl::commit_txn_eventually(
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
- MetaReader meta_reader(instance_id, txn_kv_.get());
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
@@ -1749,6 +1762,8 @@ void MetaServiceImpl::commit_txn_eventually(
stats.del_counter += txn->num_del_keys();
};
+ MetaReader meta_reader(instance_id, txn_kv_.get());
+
AnnotateTag txn_tag("txn_id", txn_id);
// tablet_id -> {table/index/partition}_id
@@ -2012,6 +2027,17 @@ void MetaServiceImpl::commit_txn_eventually(
// Save table versions
for (auto& i : table_id_tablet_ids) {
+ if (is_versioned_read) {
+ // Read the table version, to build the operation log visible
version range.
+ err = meta_reader.get_table_version(txn.get(), i.first,
nullptr, true);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ }
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
@@ -2021,6 +2047,9 @@ void MetaServiceImpl::commit_txn_eventually(
recycle_txn->set_label(txn_info.label());
std::string log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
+ if (is_versioned_read) {
+
operation_log.set_min_timestamp(meta_reader.min_read_version());
+ }
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
std::string operation_log_value;
if (!operation_log.SerializeToString(&operation_log_value)) {
@@ -2497,6 +2526,17 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
// Save table versions
for (auto& i : table_id_tablet_ids) {
+ if (is_versioned_read) {
+ // Read the table version, to build the operation log visible
version range.
+ TxnErrorCode err = meta_reader.get_table_version(txn.get(),
i.first, nullptr, true);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ }
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
}
@@ -2545,8 +2585,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
if (is_versioned_write) {
TabletStatsPB stats_pb;
- internal_get_versioned_tablet_stats(code, msg, txn.get(),
instance_id, tablet_idx,
- stats_pb);
+ internal_get_versioned_tablet_stats(code, msg, meta_reader,
txn.get(), instance_id,
+ tablet_idx, stats_pb);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update versioned tablet stats failed, code="
<< code
<< " msg=" << msg << " txn_id=" << txn_id
@@ -2589,6 +2629,9 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
std::string log_key = versioned::log_key({instance_id});
std::string operation_log_value;
OperationLogPB operation_log;
+ if (is_versioned_read) {
+ operation_log.set_min_timestamp(meta_reader.min_read_version());
+ }
operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
if (!operation_log.SerializeToString(&operation_log_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 9ef38ec9951..06fb0bdb5da 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -345,8 +345,8 @@ void convert_tmp_rowsets(
if (is_versioned_write) {
TabletStatsPB stats_pb;
- internal_get_versioned_tablet_stats(code, msg, txn.get(),
instance_id, tablet_idx,
- stats_pb);
+ internal_get_versioned_tablet_stats(code, msg, meta_reader,
txn.get(), instance_id,
+ tablet_idx, stats_pb);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "update versioned tablet stats failed, code="
<< code
<< " msg=" << msg << " txn_id=" << txn_id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]