This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f17db3c5d [feat](cloud) commit_txn save the min read version (#55059)
98f17db3c5d is described below

commit 98f17db3c5ddf8ed2529ae96ed54a4310896f671
Author: walter <[email protected]>
AuthorDate: Mon Aug 25 15:44:54 2025 +0800

    [feat](cloud) commit_txn save the min read version (#55059)
---
 .../src/meta-service/meta_service_tablet_stats.cpp |  9 ++--
 cloud/src/meta-service/meta_service_tablet_stats.h |  4 +-
 cloud/src/meta-service/meta_service_txn.cpp        | 57 +++++++++++++++++++---
 cloud/src/meta-service/txn_lazy_committer.cpp      |  4 +-
 4 files changed, 60 insertions(+), 14 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_tablet_stats.cpp 
b/cloud/src/meta-service/meta_service_tablet_stats.cpp
index a32d45e433f..f23f1cf9f9c 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.cpp
+++ b/cloud/src/meta-service/meta_service_tablet_stats.cpp
@@ -179,16 +179,17 @@ void internal_get_tablet_stats(MetaServiceCode& code, 
std::string& msg, Transact
     }
 }
 
-void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string& 
msg, Transaction* txn,
+void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string& 
msg,
+                                         MetaReader& meta_reader, Transaction* 
txn,
                                          const std::string& instance_id,
                                          const TabletIndexPB& tablet_idx, 
TabletStatsPB& stats,
                                          bool snapshot) {
     int64_t tablet_id = tablet_idx.tablet_id();
-    std::string stats_key = versioned::tablet_load_stats_key({instance_id, 
tablet_id});
+    Versionstamp versionstamp;
 
     // Try to read existing versioned tablet stats
-    Versionstamp versionstamp;
-    TxnErrorCode err = versioned::document_get(txn, stats_key, &stats, 
&versionstamp, snapshot);
+    TxnErrorCode err =
+            meta_reader.get_tablet_load_stats(tablet_id, &stats, 
&versionstamp, snapshot);
     if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
         // If versioned stats doesn't exist, read from single version
         internal_get_tablet_stats(code, msg, txn, instance_id, tablet_idx, 
stats, snapshot);
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.h 
b/cloud/src/meta-service/meta_service_tablet_stats.h
index c4decf70edc..9624832cd16 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.h
+++ b/cloud/src/meta-service/meta_service_tablet_stats.h
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/cloud.pb.h>
 
+#include "meta-store/meta_reader.h"
 #include "resource-manager/resource_manager.h"
 
 namespace doris::cloud {
@@ -54,7 +55,8 @@ void internal_get_tablet_stats(MetaServiceCode& code, 
std::string& msg, Transact
 // Get versioned tablet stats via `txn`. If an error occurs, `code` will be 
set to non OK.
 // NOTE: this function returns original `TabletStatsPB` and detached tablet 
stats val stored in kv store,
 //  MUST call `merge_tablet_stats(stats, detached_stats)` to get the real 
tablet stats.
-void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string& 
msg, Transaction* txn,
+void internal_get_versioned_tablet_stats(MetaServiceCode& code, std::string& 
msg,
+                                         MetaReader& meta_reader, Transaction* 
txn,
                                          const std::string& instance_id, const 
TabletIndexPB& idx,
                                          TabletStatsPB& stats, bool snapshot = 
false);
 
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index c0228c95315..34b9846891c 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1147,8 +1147,6 @@ void MetaServiceImpl::commit_txn_immediately(
 
     bool is_versioned_write = is_version_write_enabled(instance_id);
     bool is_versioned_read = is_version_read_enabled(instance_id);
-    MetaReader meta_reader(instance_id, txn_kv_.get());
-
     do {
         TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
         int64_t last_pending_txn_id = 0;
@@ -1237,6 +1235,8 @@ void MetaServiceImpl::commit_txn_immediately(
 
         LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
 
+        MetaReader meta_reader(instance_id, txn_kv_.get());
+
         // Prepare rowset meta and new_versions
         AnnotateTag txn_tag("txn_id", txn_id);
         std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
@@ -1441,6 +1441,17 @@ void MetaServiceImpl::commit_txn_immediately(
 
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
+            if (is_versioned_read) {
+                // Read the table version, to build the operation log visible 
version range.
+                err = meta_reader.get_table_version(txn.get(), i.first, 
nullptr, true);
+                if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    code = cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get table version, err=" << err << " 
table_id=" << i.first;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+            }
             update_table_version(txn.get(), instance_id, db_id, i.first);
             commit_txn_log.add_table_ids(i.first);
         }
@@ -1490,8 +1501,8 @@ void MetaServiceImpl::commit_txn_immediately(
 
             if (is_versioned_write) {
                 TabletStatsPB stats_pb;
-                internal_get_versioned_tablet_stats(code, msg, txn.get(), 
instance_id, tablet_idx,
-                                                    stats_pb);
+                internal_get_versioned_tablet_stats(code, msg, meta_reader, 
txn.get(), instance_id,
+                                                    tablet_idx, stats_pb);
                 if (code != MetaServiceCode::OK) {
                     LOG(WARNING) << "update versioned tablet stats failed, 
code=" << code
                                  << " msg=" << msg << " txn_id=" << txn_id
@@ -1530,6 +1541,9 @@ void MetaServiceImpl::commit_txn_immediately(
             commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
             std::string log_key = versioned::log_key({instance_id});
             OperationLogPB operation_log;
+            if (is_versioned_read) {
+                
operation_log.set_min_timestamp(meta_reader.min_read_version());
+            }
             operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
             std::string operation_log_value;
             if (!operation_log.SerializeToString(&operation_log_value)) {
@@ -1725,7 +1739,6 @@ void MetaServiceImpl::commit_txn_eventually(
 
     bool is_versioned_write = is_version_write_enabled(instance_id);
     bool is_versioned_read = is_version_read_enabled(instance_id);
-    MetaReader meta_reader(instance_id, txn_kv_.get());
 
     do {
         TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
@@ -1749,6 +1762,8 @@ void MetaServiceImpl::commit_txn_eventually(
             stats.del_counter += txn->num_del_keys();
         };
 
+        MetaReader meta_reader(instance_id, txn_kv_.get());
+
         AnnotateTag txn_tag("txn_id", txn_id);
 
         // tablet_id -> {table/index/partition}_id
@@ -2012,6 +2027,17 @@ void MetaServiceImpl::commit_txn_eventually(
 
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
+            if (is_versioned_read) {
+                // Read the table version, to build the operation log visible 
version range.
+                err = meta_reader.get_table_version(txn.get(), i.first, 
nullptr, true);
+                if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    code = cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get table version, err=" << err << " 
table_id=" << i.first;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+            }
             update_table_version(txn.get(), instance_id, db_id, i.first);
             commit_txn_log.add_table_ids(i.first);
         }
@@ -2021,6 +2047,9 @@ void MetaServiceImpl::commit_txn_eventually(
             recycle_txn->set_label(txn_info.label());
             std::string log_key = versioned::log_key({instance_id});
             OperationLogPB operation_log;
+            if (is_versioned_read) {
+                
operation_log.set_min_timestamp(meta_reader.min_read_version());
+            }
             operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
             std::string operation_log_value;
             if (!operation_log.SerializeToString(&operation_log_value)) {
@@ -2497,6 +2526,17 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
 
     // Save table versions
     for (auto& i : table_id_tablet_ids) {
+        if (is_versioned_read) {
+            // Read the table version, to build the operation log visible 
version range.
+            TxnErrorCode err = meta_reader.get_table_version(txn.get(), 
i.first, nullptr, true);
+            if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get table version, err=" << err << " 
table_id=" << i.first;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+        }
         update_table_version(txn.get(), instance_id, db_id, i.first);
         commit_txn_log.add_table_ids(i.first);
     }
@@ -2545,8 +2585,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
 
         if (is_versioned_write) {
             TabletStatsPB stats_pb;
-            internal_get_versioned_tablet_stats(code, msg, txn.get(), 
instance_id, tablet_idx,
-                                                stats_pb);
+            internal_get_versioned_tablet_stats(code, msg, meta_reader, 
txn.get(), instance_id,
+                                                tablet_idx, stats_pb);
             if (code != MetaServiceCode::OK) {
                 LOG(WARNING) << "update versioned tablet stats failed, code=" 
<< code
                              << " msg=" << msg << " txn_id=" << txn_id
@@ -2589,6 +2629,9 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         std::string log_key = versioned::log_key({instance_id});
         std::string operation_log_value;
         OperationLogPB operation_log;
+        if (is_versioned_read) {
+            operation_log.set_min_timestamp(meta_reader.min_read_version());
+        }
         operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
         if (!operation_log.SerializeToString(&operation_log_value)) {
             code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 9ef38ec9951..06fb0bdb5da 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -345,8 +345,8 @@ void convert_tmp_rowsets(
 
         if (is_versioned_write) {
             TabletStatsPB stats_pb;
-            internal_get_versioned_tablet_stats(code, msg, txn.get(), 
instance_id, tablet_idx,
-                                                stats_pb);
+            internal_get_versioned_tablet_stats(code, msg, meta_reader, 
txn.get(), instance_id,
+                                                tablet_idx, stats_pb);
             if (code != MetaServiceCode::OK) {
                 LOG(WARNING) << "update versioned tablet stats failed, code=" 
<< code
                              << " msg=" << msg << " txn_id=" << txn_id


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to