This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee37bd66691 [improve](cloud) reduce fe call get_version to 
meta_service (#60467)
ee37bd66691 is described below

commit ee37bd66691c5bdea19687fa4b25f4e3a2b7bf1d
Author: meiyi <[email protected]>
AuthorDate: Sat Feb 28 15:16:40 2026 +0800

    [improve](cloud) reduce fe call get_version to meta_service (#60467)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 cloud/src/meta-service/meta_service.h              |   3 +-
 cloud/src/meta-service/meta_service_partition.cpp  | 141 ++++++++++++-
 cloud/src/meta-service/meta_service_txn.cpp        | 184 +++++++++++++++--
 cloud/test/meta_service_test.cpp                   |  37 +++-
 cloud/test/meta_service_versioned_read_test.cpp    |  39 ++++
 cloud/test/txn_lazy_commit_test.cpp                |  38 ++++
 .../main/java/org/apache/doris/common/Config.java  |  19 ++
 .../java/org/apache/doris/catalog/OlapTable.java   |  76 ++++++-
 .../doris/clone/DynamicPartitionScheduler.java     |   2 +-
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |  11 +-
 .../cloud/catalog/CloudFEVersionSynchronizer.java  | 220 +++++++++++++++++++++
 .../apache/doris/cloud/catalog/CloudPartition.java |  52 ++++-
 .../cloud/catalog/CloudSyncVersionDaemon.java      | 203 +++++++++++++++++++
 .../cloud/datasource/CloudInternalCatalog.java     |  52 ++++-
 .../transaction/CloudGlobalTransactionMgr.java     | 122 ++++++++----
 .../java/org/apache/doris/common/ClientPool.java   |   3 +
 .../doris/common/proc/PartitionsProcDir.java       |  74 ++++++-
 .../apache/doris/datasource/InternalCatalog.java   |  13 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   7 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  12 ++
 .../org/apache/doris/catalog/OlapTableTest.java    | 132 +++++++++++++
 gensrc/proto/cloud.proto                           |   5 +-
 gensrc/thrift/FrontendService.thrift               |  15 ++
 23 files changed, 1369 insertions(+), 91 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 2e4e2790f4b..487b26f3899 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -505,7 +505,8 @@ private:
             std::string_view instance_id, KVStats& stats);
 
     void commit_partition_internal(const PartitionRequest* request, const 
std::string& instance_id,
-                                   const std::vector<int64_t>& partition_ids, 
MetaServiceCode& code,
+                                   const std::vector<int64_t>& partition_ids,
+                                   PartitionResponse* response, 
MetaServiceCode& code,
                                    std::string& msg, KVStats& stats);
 
     // Wait for all pending transactions before returning, and bump up the 
version to the latest.
diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index 811fc219b99..c28abeadb0f 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -211,6 +211,9 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
         msg = "db_id is required for versioned write, please upgrade your FE 
version";
         return;
     }
+    if (request->has_is_new_table() && request->is_new_table() && 
is_versioned_write) {
+        txn->enable_get_versionstamp();
+    }
 
     CloneChainReader reader(instance_id, resource_mgr_.get());
     for (auto index_id : request->index_ids()) {
@@ -327,6 +330,29 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
                 msg = fmt::format("failed to get table version, err={}", err);
                 return;
             }
+        } else {
+            // set table version in response
+            int64_t table_id = request->table_id();
+            std::string ver_key = table_version_key({instance_id, 
request->db_id(), table_id});
+            std::string ver_val;
+            err = txn->get(ver_key, &ver_val);
+            int64_t table_version = 0;
+            if (err == TxnErrorCode::TXN_OK) {
+                if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "malformed table version value, err=" << err
+                       << " table_id=" << request->table_id();
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+            } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get table version, err=" << err << " 
table_id=" << table_id;
+                msg = ss.str();
+                return;
+            }
+            response->set_table_version(table_version + 1);
         }
         // init table version, for create and truncate table
         update_table_version(txn.get(), instance_id, request->db_id(), 
request->table_id());
@@ -353,6 +379,22 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
         msg = fmt::format("failed to commit txn: {}", err);
         return;
     }
+
+    // set table version in response
+    if (request->has_is_new_table() && request->is_new_table() && 
is_versioned_read) {
+        Versionstamp vs;
+        err = txn->get_versionstamp(&vs);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get kv txn versionstamp, table_id=" << 
request->table_id()
+               << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+        int64_t version = vs.version();
+        response->set_table_version(version);
+    }
 }
 
 void MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
@@ -655,7 +697,7 @@ void 
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
         for (size_t j = i; j < end; j++) {
             partition_ids.push_back(request->partition_ids(j));
         }
-        commit_partition_internal(request, instance_id, partition_ids, code, 
msg, stats);
+        commit_partition_internal(request, instance_id, partition_ids, 
response, code, msg, stats);
         if (code != MetaServiceCode::OK) {
             return;
         }
@@ -665,8 +707,8 @@ void 
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
 void MetaServiceImpl::commit_partition_internal(const PartitionRequest* 
request,
                                                 const std::string& instance_id,
                                                 const std::vector<int64_t>& 
partition_ids,
-                                                MetaServiceCode& code, 
std::string& msg,
-                                                KVStats& stats) {
+                                                PartitionResponse* response, 
MetaServiceCode& code,
+                                                std::string& msg, KVStats& 
stats) {
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv_->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
@@ -699,6 +741,10 @@ void MetaServiceImpl::commit_partition_internal(const 
PartitionRequest* request,
         return;
     }
 
+    if (is_versioned_write) {
+        txn->enable_get_versionstamp();
+    }
+
     CloneChainReader reader(instance_id, resource_mgr_.get());
     size_t num_commit = 0;
     for (auto part_id : partition_ids) {
@@ -794,6 +840,31 @@ void MetaServiceImpl::commit_partition_internal(const 
PartitionRequest* request,
             msg = fmt::format("failed to get table version, err={}", err);
             return;
         }
+    } else {
+        // set table version in response
+        std::string ver_key =
+                table_version_key({instance_id, request->db_id(), 
request->table_id()});
+        std::string ver_val;
+        err = txn->get(ver_key, &ver_val);
+        int64_t table_version = 0;
+        if (err == TxnErrorCode::TXN_OK) {
+            if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                std::stringstream ss;
+                ss << "malformed table version value, err=" << err
+                   << " table_id=" << request->table_id();
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+        } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            code = cast_as<ErrCategory::READ>(err);
+            std::stringstream ss;
+            ss << "failed to get table version, err=" << err << " table_id=" 
<< request->table_id();
+            msg = ss.str();
+            return;
+        }
+        response->set_table_version(table_version + 1);
     }
     update_table_version(txn.get(), instance_id, request->db_id(), 
request->table_id());
 
@@ -818,6 +889,23 @@ void MetaServiceImpl::commit_partition_internal(const 
PartitionRequest* request,
         msg = fmt::format("failed to commit txn: {}", err);
         return;
     }
+
+    // set table version in response
+    if (is_versioned_read) {
+        Versionstamp vs;
+        err = txn->get_versionstamp(&vs);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            std::stringstream ss;
+            ss << "failed to get kv txn versionstamp, table_id=" << 
request->table_id()
+               << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+        int64_t version = vs.version();
+        response->set_table_version(version);
+    }
 }
 
 void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* 
controller,
@@ -870,6 +958,9 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
     drop_partition_log.set_table_id(request->table_id());
     drop_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
     drop_partition_log.set_expired_at_s(request->expiration());
+    if (is_versioned_write) {
+        txn->enable_get_versionstamp();
+    }
 
     CloneChainReader reader(instance_id, resource_mgr_.get());
     for (auto part_id : request->partition_ids()) {
@@ -938,6 +1029,32 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
                 msg = fmt::format("failed to get table version, err={}", err);
                 return;
             }
+        } else {
+            // set table version in response
+            std::string ver_key =
+                    table_version_key({instance_id, request->db_id(), 
request->table_id()});
+            std::string ver_val;
+            err = txn->get(ver_key, &ver_val);
+            int64_t table_version = 0;
+            if (err == TxnErrorCode::TXN_OK) {
+                if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    std::stringstream ss;
+                    ss << "malformed table version value, err=" << err
+                       << " table_id=" << request->table_id();
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+            } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                code = cast_as<ErrCategory::READ>(err);
+                std::stringstream ss;
+                ss << "failed to get table version, err=" << err
+                   << " table_id=" << request->table_id();
+                msg = ss.str();
+                return;
+            }
+            response->set_table_version(table_version + 1);
         }
         update_table_version(txn.get(), instance_id, request->db_id(), 
request->table_id());
         drop_partition_log.set_update_table_version(true);
@@ -964,6 +1081,24 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
         msg = fmt::format("failed to commit txn: {}", err);
         return;
     }
+
+    // set table version in response
+    if (request->has_need_update_table_version() && 
request->need_update_table_version() &&
+        is_versioned_read) {
+        Versionstamp vs;
+        err = txn->get_versionstamp(&vs);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            std::stringstream ss;
+            ss << "failed to get kv txn versionstamp, table_id=" << 
request->table_id()
+               << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+        int64_t version = vs.version();
+        response->set_table_version(version);
+    }
 }
 
 void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index cd7b62e7e6d..88fd0b81053 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1539,6 +1539,9 @@ void MetaServiceImpl::commit_txn_immediately(
             LOG(WARNING) << msg;
             return;
         }
+        if (is_versioned_write) {
+            txn->enable_get_versionstamp();
+        }
         DORIS_CLOUD_DEFER {
             if (txn == nullptr) return;
             stats.get_bytes += txn->get_bytes();
@@ -1827,6 +1830,8 @@ void MetaServiceImpl::commit_txn_immediately(
             response->add_versions(new_version);
         }
 
+        // table_id -> version, for response
+        std::map<int64_t, int64_t> table_version_map;
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
             if (is_versioned_read) {
@@ -1839,6 +1844,29 @@ void MetaServiceImpl::commit_txn_immediately(
                     LOG(WARNING) << msg;
                     return;
                 }
+            } else {
+                // set table versions in response
+                int64_t table_id = i.first;
+                std::string ver_key = table_version_key({instance_id, db_id, 
table_id});
+                std::string ver_val;
+                err = txn->get(ver_key, &ver_val);
+                int64_t table_version = 0;
+                if (err == TxnErrorCode::TXN_OK) {
+                    if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                        ss << "malformed table version value, err=" << err
+                           << " table_id=" << i.first;
+                        msg = ss.str();
+                        LOG(WARNING) << msg;
+                        return;
+                    }
+                } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    code = cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get table version, err=" << err << " 
table_id=" << table_id;
+                    msg = ss.str();
+                    return;
+                }
+                table_version_map[table_id] = table_version + 1;
             }
             update_table_version(txn.get(), instance_id, db_id, i.first);
             commit_txn_log.add_table_ids(i.first);
@@ -2004,20 +2032,40 @@ void MetaServiceImpl::commit_txn_immediately(
             return;
         }
 
+        // set table versions in response
+        if (is_versioned_read) {
+            Versionstamp vs;
+            err = txn->get_versionstamp(&vs);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get kv txn versionstamp, txn_id=" << txn_id 
<< " err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+            int64_t version = vs.version();
+            for (auto& i : table_id_tablet_ids) {
+                int64_t table_id = i.first;
+                table_version_map[table_id] = version;
+            }
+        }
+
         // calculate table stats from tablets stats
         std::map<int64_t /*table_id*/, TableStats> table_stats;
         std::vector<int64_t> 
base_tablet_ids(request->base_tablet_ids().begin(),
                                              request->base_tablet_ids().end());
         calc_table_stats(tablet_ids, tablet_stats, table_stats, 
base_tablet_ids);
-        for (const auto& pair : table_stats) {
+        for (const auto& pair : table_version_map) {
             TableStatsPB* stats_pb = response->add_table_stats();
             auto table_id = pair.first;
-            auto stats = pair.second;
-            get_pb_from_tablestats(stats, stats_pb);
             stats_pb->set_table_id(table_id);
-            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << 
txn_id
-                       << " table_id=" << table_id
-                       << " updated_row_count=" << 
stats_pb->updated_row_count();
+            stats_pb->set_table_version(pair.second);
+            if (auto it = table_stats.find(table_id); it != table_stats.end()) 
{
+                get_pb_from_tablestats(it->second, stats_pb);
+                VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" 
<< txn_id
+                           << " table_id=" << table_id
+                           << " updated_row_count=" << 
stats_pb->updated_row_count();
+            }
         }
         response->mutable_txn_info()->CopyFrom(txn_info);
         TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code);
@@ -2236,6 +2284,9 @@ void MetaServiceImpl::commit_txn_eventually(
             LOG(WARNING) << msg;
             return;
         }
+        if (is_versioned_write) {
+            txn->enable_get_versionstamp();
+        }
 
         CommitTxnLogPB commit_txn_log;
         commit_txn_log.set_txn_id(txn_id);
@@ -2482,6 +2533,8 @@ void MetaServiceImpl::commit_txn_eventually(
             }
         }
 
+        // table_id -> version, for response
+        std::map<int64_t, int64_t> table_version_map;
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
             if (is_versioned_read) {
@@ -2494,6 +2547,29 @@ void MetaServiceImpl::commit_txn_eventually(
                     LOG(WARNING) << msg;
                     return;
                 }
+            } else {
+                // set table versions in response
+                int64_t table_id = i.first;
+                std::string ver_key = table_version_key({instance_id, db_id, 
table_id});
+                std::string ver_val;
+                err = txn->get(ver_key, &ver_val);
+                int64_t table_version = 0;
+                if (err == TxnErrorCode::TXN_OK) {
+                    if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                        ss << "malformed table version value, err=" << err
+                           << " table_id=" << i.first;
+                        msg = ss.str();
+                        LOG(WARNING) << msg;
+                        return;
+                    }
+                } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    code = cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get table version, err=" << err << " 
table_id=" << table_id;
+                    msg = ss.str();
+                    return;
+                }
+                table_version_map[table_id] = table_version + 1;
             }
             update_table_version(txn.get(), instance_id, db_id, i.first);
             commit_txn_log.add_table_ids(i.first);
@@ -2532,6 +2608,24 @@ void MetaServiceImpl::commit_txn_eventually(
             return;
         }
 
+        // set table versions in response
+        if (is_versioned_read) {
+            Versionstamp vs;
+            err = txn->get_versionstamp(&vs);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get kv txn versionstamp, txn_id=" << txn_id 
<< " err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+            int64_t version = vs.version();
+            for (auto& i : table_id_tablet_ids) {
+                int64_t table_id = i.first;
+                table_version_map[table_id] = version;
+            }
+        }
+
         
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::abort_txn_after_mark_txn_commited");
 
         
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
@@ -2562,15 +2656,17 @@ void MetaServiceImpl::commit_txn_eventually(
         std::vector<int64_t> 
base_tablet_ids(request->base_tablet_ids().begin(),
                                              request->base_tablet_ids().end());
         calc_table_stats(tablet_ids, tablet_stats, table_stats, 
base_tablet_ids);
-        for (const auto& pair : table_stats) {
+        for (const auto& pair : table_version_map) {
             TableStatsPB* stats_pb = response->add_table_stats();
             auto table_id = pair.first;
-            auto stats = pair.second;
-            get_pb_from_tablestats(stats, stats_pb);
             stats_pb->set_table_id(table_id);
-            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << 
txn_id
-                       << " table_id=" << table_id
-                       << " updated_row_count=" << 
stats_pb->updated_row_count();
+            stats_pb->set_table_version(pair.second);
+            if (auto it = table_stats.find(table_id); it != table_stats.end()) 
{
+                get_pb_from_tablestats(it->second, stats_pb);
+                VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" 
<< txn_id
+                           << " table_id=" << table_id
+                           << " updated_row_count=" << 
stats_pb->updated_row_count();
+            }
         }
 
         // txn set visible for fe callback
@@ -2629,6 +2725,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         }
         sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, 
std::move(tmp_rowsets_meta));
     }
+    bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
     do {
         TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
         // Create a readonly txn for scan tmp rowset
@@ -2641,6 +2739,9 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
             LOG(WARNING) << msg;
             return;
         }
+        if (is_versioned_write) {
+            txn->enable_get_versionstamp();
+        }
         DORIS_CLOUD_DEFER {
             if (txn == nullptr) return;
             stats.get_bytes += txn->get_bytes();
@@ -2701,8 +2802,6 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
 
         AnnotateTag txn_tag("txn_id", txn_id);
 
-        bool is_versioned_write = is_version_write_enabled(instance_id);
-        bool is_versioned_read = is_version_read_enabled(instance_id);
         CloneChainReader meta_reader(instance_id, resource_mgr_.get());
 
         // Prepare rowset meta and new_versions
@@ -2920,6 +3019,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
             response->add_versions(new_version);
         }
 
+        // table_id -> version, for response
+        std::map<int64_t, int64_t> table_version_map;
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
             if (is_versioned_read) {
@@ -2932,6 +3033,29 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
                     LOG(WARNING) << msg;
                     return;
                 }
+            } else {
+                // set table versions in response
+                int64_t table_id = i.first;
+                std::string ver_key = table_version_key({instance_id, db_id, 
table_id});
+                std::string ver_val;
+                err = txn->get(ver_key, &ver_val);
+                int64_t table_version = 0;
+                if (err == TxnErrorCode::TXN_OK) {
+                    if (!txn->decode_atomic_int(ver_val, &table_version)) {
+                        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                        ss << "malformed table version value, err=" << err
+                           << " table_id=" << i.first;
+                        msg = ss.str();
+                        LOG(WARNING) << msg;
+                        return;
+                    }
+                } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                    code = cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get table version, err=" << err << " 
table_id=" << table_id;
+                    msg = ss.str();
+                    return;
+                }
+                table_version_map[table_id] = table_version + 1;
             }
             update_table_version(txn.get(), instance_id, db_id, i.first);
             commit_txn_log.add_table_ids(i.first);
@@ -3082,20 +3206,40 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
             return;
         }
 
+        // set table versions in response
+        if (is_versioned_read) {
+            Versionstamp vs;
+            err = txn->get_versionstamp(&vs);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get kv txn versionstamp, txn_id=" << txn_id 
<< " err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+            int64_t version = vs.version();
+            for (auto& i : table_id_tablet_ids) {
+                int64_t table_id = i.first;
+                table_version_map[table_id] = version;
+            }
+        }
+
         // calculate table stats from tablets stats
         std::map<int64_t /*table_id*/, TableStats> table_stats;
         std::vector<int64_t> 
base_tablet_ids(request->base_tablet_ids().begin(),
                                              request->base_tablet_ids().end());
         calc_table_stats(tablet_ids, tablet_stats, table_stats, 
base_tablet_ids);
-        for (const auto& pair : table_stats) {
+        for (const auto& pair : table_version_map) {
             TableStatsPB* stats_pb = response->add_table_stats();
             auto table_id = pair.first;
-            auto stats = pair.second;
-            get_pb_from_tablestats(stats, stats_pb);
             stats_pb->set_table_id(table_id);
-            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << 
txn_id
-                       << " table_id=" << table_id
-                       << " updated_row_count=" << 
stats_pb->updated_row_count();
+            stats_pb->set_table_version(table_version_map[table_id]);
+            if (auto it = table_stats.find(table_id); it != table_stats.end()) 
{
+                get_pb_from_tablestats(it->second, stats_pb);
+                VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" 
<< txn_id
+                           << " table_id=" << table_id
+                           << " updated_row_count=" << 
stats_pb->updated_row_count();
+            }
         }
 
         response->mutable_txn_info()->CopyFrom(txn_info);
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index f815c0d3244..2b057a0b258 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -217,6 +217,20 @@ static void commit_txn(MetaServiceProxy* meta_service, 
int64_t db_id, int64_t tx
             << label << ", res=" << res.ShortDebugString();
 }
 
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id, 
int64_t table_id,
+                              int64_t& version) {
+    brpc::Controller ctrl;
+    GetVersionRequest req;
+    req.set_db_id(db_id);
+    req.set_table_id(table_id);
+    req.set_is_table_version(true);
+    GetVersionResponse resp;
+    meta_service->get_version(&ctrl, &req, &resp, nullptr);
+    ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+            << ", get table version res=" << resp.ShortDebugString();
+    version = resp.version();
+}
+
 doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int 
partition_id = 10,
                                        int64_t version = -1, int num_rows = 
100) {
     doris::RowsetMetaCloudPB rowset;
@@ -1797,7 +1811,7 @@ TEST(MetaServiceTest, CommitTxnTest) {
     int64_t partition_id = 1236;
 
     // case: first version of rowset
-    {
+    for (int i = 0; i < 2; ++i) {
         int64_t txn_id = -1;
         // begin txn
         {
@@ -1806,7 +1820,7 @@ TEST(MetaServiceTest, CommitTxnTest) {
             req.set_cloud_unique_id("test_cloud_unique_id");
             TxnInfoPB txn_info_pb;
             txn_info_pb.set_db_id(666);
-            txn_info_pb.set_label("test_label");
+            txn_info_pb.set_label("test_label_" + std::to_string(i));
             txn_info_pb.add_table_ids(1234);
             txn_info_pb.set_timeout_ms(36000);
             req.mutable_txn_info()->CopyFrom(txn_info_pb);
@@ -1854,6 +1868,11 @@ TEST(MetaServiceTest, CommitTxnTest) {
             
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                      &req, &res, nullptr);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.table_stats().size(), 1);
+
+            int64_t table_version = res.table_stats()[0].table_version();
+            get_table_version(meta_service.get(), 666, table_id, 
table_version);
+            ASSERT_EQ(table_version, i + 1);
         }
 
         // doubly commit txn
@@ -2105,6 +2124,14 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
         ASSERT_EQ(res.table_ids()[2], t1);
         ASSERT_EQ(res.partition_ids()[2], t1_p1) << res.ShortDebugString();
         ASSERT_EQ(res.versions()[2], 3) << res.ShortDebugString();
+
+        ASSERT_EQ(res.table_stats().size(), 2);
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, t1, table_version);
+        ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
+        table_version = 0;
+        get_table_version(meta_service.get(), db_id, t2, table_version);
+        ASSERT_EQ(res.table_stats()[1].table_version(), table_version);
     }
 
     // doubly commit txn
@@ -7862,6 +7889,8 @@ TEST(MetaServiceTest, IndexRequest) {
     ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
     val_int = *reinterpret_cast<const int64_t*>(val.data());
     ASSERT_EQ(val_int, 1);
+    ASSERT_TRUE(res.has_table_version());
+    ASSERT_EQ(val_int, res.table_version());
     // Last state DROPPED
     reset_meta_service();
     index_pb.set_state(RecycleIndexPB::DROPPED);
@@ -8113,6 +8142,8 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
     val_int = *reinterpret_cast<const int64_t*>(val.data());
     ASSERT_EQ(val_int, 2);
+    ASSERT_TRUE(res.has_table_version());
+    ASSERT_EQ(val_int, res.table_version());
     // Last state DROPPED
     reset_meta_service();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -8274,6 +8305,8 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
     val_int = *reinterpret_cast<const int64_t*>(val.data());
     ASSERT_EQ(val_int, 2);
+    ASSERT_TRUE(res.has_table_version());
+    ASSERT_EQ(val_int, res.table_version());
     // Last state PREPARED
     reset_meta_service();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
diff --git a/cloud/test/meta_service_versioned_read_test.cpp 
b/cloud/test/meta_service_versioned_read_test.cpp
index eab1ec78d2e..58793011746 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -126,6 +126,20 @@ void compact_rowset(TxnKv* txn_kv, std::string 
instance_id, int64_t tablet_id, i
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 }
 
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id, 
int64_t table_id,
+                              int64_t& version) {
+    brpc::Controller ctrl;
+    GetVersionRequest req;
+    req.set_db_id(db_id);
+    req.set_table_id(table_id);
+    req.set_is_table_version(true);
+    GetVersionResponse resp;
+    meta_service->get_version(&ctrl, &req, &resp, nullptr);
+    ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+            << ", get table version res=" << resp.ShortDebugString();
+    version = resp.version();
+}
+
 // Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
 static void create_and_refresh_instance(MetaServiceProxy* service, std::string 
instance_id) {
     // write instance
@@ -220,6 +234,11 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) {
             
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                      &req, &res, nullptr);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+            ASSERT_EQ(res.table_stats().size(), 1);
+            int64_t table_version = 0;
+            get_table_version(meta_service.get(), db_id, table_id, 
table_version);
+            ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
         }
 
         // doubly commit txn
@@ -423,6 +442,14 @@ TEST(MetaServiceVersionedReadTest, 
CommitTxnWithSubTxnTest) {
                                    << ", res=" << res.DebugString();
             }
         }
+
+        ASSERT_EQ(res.table_stats().size(), 2);
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, t2, table_version);
+        ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
+        table_version = 0;
+        get_table_version(meta_service.get(), db_id, t1, table_version);
+        ASSERT_EQ(res.table_stats()[1].table_version(), table_version);
     }
 
     // doubly commit txn
@@ -1140,6 +1167,11 @@ TEST(MetaServiceVersionedReadTest, IndexRequest) {
         req.set_is_new_table(true);
         meta_service->commit_index(&ctrl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+
+        ASSERT_TRUE(res.has_table_version());
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, table_id, table_version);
+        ASSERT_EQ(table_version, res.table_version());
     }
 
     {
@@ -1166,6 +1198,8 @@ TEST(MetaServiceVersionedReadTest, IndexRequest) {
         req.set_is_new_table(true);
         meta_service->commit_index(&ctrl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+
+        ASSERT_FALSE(res.has_table_version());
     }
 }
 
@@ -1205,6 +1239,11 @@ TEST(MetaServiceVersionedReadTest, PartitionRequest) {
         req.add_partition_ids(partition_id);
         meta_service->commit_partition(&ctrl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+
+        ASSERT_TRUE(res.has_table_version());
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, table_id, table_version);
+        ASSERT_EQ(table_version, res.table_version());
     }
 
     {
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index b316a5e5f45..a7506f52289 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -418,6 +418,20 @@ static void create_and_refresh_instance(
     
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
 }
 
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id, 
int64_t table_id,
+                              int64_t& version) {
+    brpc::Controller ctrl;
+    GetVersionRequest req;
+    req.set_db_id(db_id);
+    req.set_table_id(table_id);
+    req.set_is_table_version(true);
+    GetVersionResponse resp;
+    meta_service->get_version(&ctrl, &req, &resp, nullptr);
+    ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+            << ", get table version res=" << resp.ShortDebugString();
+    version = resp.version();
+}
+
 TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
     auto txn_kv = get_mem_txn_kv();
     auto meta_service = get_meta_service(txn_kv, true);
@@ -626,6 +640,12 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithoutDbIdTest) {
         ASSERT_GE(repair_tablet_idx_count, 0);
         ASSERT_TRUE(last_pending_txn_id_hit);
         ASSERT_TRUE(commit_txn_eventually_finish_hit);
+
+        ASSERT_EQ(res.table_stats().size(), 1);
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, table_id, table_version);
+        ASSERT_EQ(table_version, 1);
+        ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
     }
 
     {
@@ -917,6 +937,11 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
         ASSERT_GE(repair_tablet_idx_count, 0);
         ASSERT_TRUE(last_pending_txn_id_hit);
         ASSERT_TRUE(commit_txn_eventually_finish_hit);
+
+        ASSERT_EQ(res.table_stats().size(), 1);
+        int64_t table_version = 0;
+        get_table_version(meta_service.get(), db_id, table_id, table_version);
+        ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
     }
 
     {
@@ -1448,6 +1473,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
     }
 
     int64_t txn_id1 = 0;
+    int64_t txn1_table_version = 0;
     std::thread thread1([&] {
         {
             std::unique_lock<std::mutex> _lock(go_mutex);
@@ -1494,10 +1520,14 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
             
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                      &req, &res, nullptr);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+            ASSERT_EQ(res.table_stats().size(), 1);
+            txn1_table_version = res.table_stats()[0].table_version();
         }
     });
 
     int64_t txn_id2 = 0;
+    int64_t txn2_table_version = 0;
     std::thread thread2([&] {
         {
             std::unique_lock<std::mutex> _lock(go_mutex);
@@ -1544,6 +1574,9 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
             
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                      &req, &res, nullptr);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+            ASSERT_EQ(res.table_stats().size(), 1);
+            txn2_table_version = res.table_stats()[0].table_version();
         }
     });
 
@@ -1555,6 +1588,11 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
     thread1.join();
     thread2.join();
 
+    ASSERT_TRUE(txn1_table_version == 1 && txn2_table_version == 2 ||
+                txn1_table_version == 2 && txn2_table_version == 1)
+            << ", txn1_table_version=" << txn1_table_version
+            << ", txn2_table_version=" << txn2_table_version;
+
     sp->clear_all_call_backs();
     sp->clear_trace();
     sp->disable_processing();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7a0ce25a888..2b06fcc5a80 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3631,6 +3631,25 @@ public class Config extends ConfigBase {
         "Maximal concurrent num of get tablet stat job."})
     public static int max_get_tablet_stat_task_threads_num = 4;
 
+    @ConfField(description = {"存算分离模式下同步 table 和 partition version 的间隔. 所有 
frontend 都会检查",
+            "Cloud table and partition version syncer interval. All frontends 
will perform the checking"})
+    public static int cloud_version_syncer_interval_second = 20;
+
+    @ConfField(mutable = true, description = {"存算分离模式下是否启用同步 table 和 partition 
version 的功能",
+            "Whether to enable the function of syncing table and partition 
version in cloud mode"})
+    public static boolean cloud_enable_version_syncer = true;
+
+    @ConfField(description = {"Get version task 的并发数", "Concurrent num of get 
version task."})
+    public static int cloud_get_version_task_threads_num = 4;
+
+    @ConfField(description = {"Master FE 发送给其它 FE sync version task 的最大并发数",
+            "Maximal concurrent num of sync version task between Master FE and 
other FEs."})
+    public static int cloud_sync_version_task_threads_num = 4;
+
+    @ConfField(mutable = true, description = {"Get version task 包含的 table 或 
partition 数目的 batch size",
+            "Maximal table or partition batch size of get version task."})
+    public static int cloud_get_version_task_batch_size = 2000;
+
     @ConfField(mutable = true, description = {"schema change job 失败是否重试",
             "Whether to enable retry when a schema change job fails, default 
is true."})
     public static boolean enable_schema_change_retry = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index cc78ae250cc..3d6de95aed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -128,6 +128,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -243,6 +245,8 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
     private volatile long lastTableVersionCachedTimeMs = 0;
     private volatile long cachedTableVersion = -1;
 
+    private ReadWriteLock versionLock = Config.isCloudMode() ? new 
ReentrantReadWriteLock(true) : null;
+
     public OlapTable() {
         // for persist
         super(TableType.OLAP);
@@ -3333,8 +3337,15 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return System.currentTimeMillis() - lastTableVersionCachedTimeMs > 
cacheExpirationMs;
     }
 
-    @VisibleForTesting
-    protected void setCachedTableVersion(long version) {
+    public boolean isCachedTableVersionExpired(long expirationMs) {
+        // -1 means no cache yet, need to fetch from MS
+        if (cachedTableVersion == -1 || expirationMs <= 0) {
+            return true;
+        }
+        return System.currentTimeMillis() - lastTableVersionCachedTimeMs > 
expirationMs;
+    }
+
+    public void setCachedTableVersion(long version) {
         if (version >= cachedTableVersion) {
             cachedTableVersion = version;
             lastTableVersionCachedTimeMs = System.currentTimeMillis();
@@ -3400,6 +3411,49 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
                     .collect(Collectors.toList());
         }
 
+        ConnectContext ctx = ConnectContext.get();
+        long cloudTableVersionCacheTtlMs = ctx == null
+                ? 
VariableMgr.getDefaultSessionVariable().cloudTableVersionCacheTtlMs
+                : ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
+        if (cloudTableVersionCacheTtlMs <= 0) { // No cached versions will be 
used
+            return getVisibleVersionInBatchFromMs(tables);
+        }
+
+        // tableId -> cachedVersion, 0 means to be fetched from meta-service
+        List<Pair<Long, Long>> allVersions = new ArrayList<>(tables.size());
+        List<OlapTable> expiredTables = new ArrayList<>(tables.size());
+        for (OlapTable table : tables) {
+            long ver = table.getCachedTableVersion();
+            if (table.isCachedTableVersionExpired()) {
+                expiredTables.add(table);
+                ver = 0L; // 0 means to be fetched from meta-service
+            }
+            allVersions.add(Pair.of(table.getId(), ver));
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("cloudTableVersionCacheTtlMs={}, numTables={}, 
numExpiredTables={}",
+                    cloudTableVersionCacheTtlMs, tables.size(), 
expiredTables.size());
+        }
+
+        List<Long> msVersions = null;
+        if (!expiredTables.isEmpty()) { // Not all table versions are from 
cache
+            msVersions = getVisibleVersionInBatchFromMs(expiredTables);
+        }
+        int msIdx = 0;
+        for (Pair<Long, Long> v : allVersions) { // ATTN: keep the assigning 
order!!!
+            if (v.second == 0L && msVersions != null) {
+                v.second = msVersions.get(msIdx++);
+            }
+        }
+        if (!expiredTables.isEmpty()) { // Not all table versions are from 
cache
+            assert msIdx == msVersions.size() : "size not match, idx=" + msIdx 
+ " verSize=" + msVersions.size();
+        }
+
+        return allVersions.stream().map(i -> 
i.second).collect(Collectors.toList());
+    }
+
+    // Get the table versions in batch from meta-service, and update cache.
+    private static List<Long> getVisibleVersionInBatchFromMs(List<OlapTable> 
tables) {
         List<Long> dbIds = new ArrayList<>(tables.size());
         List<Long> tableIds = new ArrayList<>(tables.size());
         for (OlapTable table : tables) {
@@ -3417,7 +3471,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return versions;
     }
 
-    private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, 
List<Long> tableIds) {
+    public static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, 
List<Long> tableIds) {
         // get version rpc
         Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
                 .setRequestIp(FrontendOptions.getLocalHostAddressCached())
@@ -3869,4 +3923,20 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         }
         return matched;
     }
+
+    public void versionReadLock() {
+        versionLock.readLock().lock();
+    }
+
+    public void versionReadUnlock() {
+        versionLock.readLock().unlock();
+    }
+
+    public void versionWriteLock() {
+        versionLock.writeLock().lock();
+    }
+
+    public void versionWriteUnlock() {
+        versionLock.writeLock().unlock();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index cadf2d3b75a..f2b17cda54f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -845,7 +845,7 @@ public class DynamicPartitionScheduler extends MasterDaemon 
{
                 throw new Exception("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition");
             }
             Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), 
olapTable.getId(),
-                    succeedPartitionIds, indexIds, true /* isCreateTable */, 
false /* isBatchCommit */);
+                    succeedPartitionIds, indexIds, true /* isCreateTable */, 
false /* isBatchCommit */, olapTable);
             LOG.info("begin write edit log to add partitions in batch, "
                     + "numPartitions: {}, db: {}, table: {}, tableId: {}",
                     batchPartsInfo.size(), db.getFullName(), tableName, 
olapTable.getId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 65786c29976..07d4fe552d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -69,6 +69,8 @@ public class CloudEnv extends Env {
 
     private CloudTabletRebalancer cloudTabletRebalancer;
     private CacheHotspotManager cacheHotspotMgr;
+    private CloudSyncVersionDaemon cloudSyncVersionDaemon;
+    private CloudFEVersionSynchronizer cloudFEVersionSynchronizer;
 
     private boolean enableStorageVault;
 
@@ -89,6 +91,8 @@ public class CloudEnv extends Env {
         this.cloudTabletRebalancer = new 
CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
         this.cacheHotspotMgr = new 
CacheHotspotManager((CloudSystemInfoService) systemInfo);
         this.upgradeMgr = new CloudUpgradeMgr((CloudSystemInfoService) 
systemInfo);
+        this.cloudSyncVersionDaemon = new CloudSyncVersionDaemon();
+        this.cloudFEVersionSynchronizer = new CloudFEVersionSynchronizer();
         this.cloudSnapshotHandler = CloudSnapshotHandler.getInstance();
     }
 
@@ -146,6 +150,7 @@ public class CloudEnv extends Env {
 
         super.initialize(args);
         this.cloudSnapshotHandler.initialize();
+        cloudInstanceStatusChecker.start();
     }
 
     @Override
@@ -162,11 +167,15 @@ public class CloudEnv extends Env {
         cloudSnapshotHandler.start();
     }
 
+    public CloudFEVersionSynchronizer getCloudFEVersionSynchronizer() {
+        return cloudFEVersionSynchronizer;
+    }
+
     @Override
     protected void startNonMasterDaemonThreads() {
         LOG.info("start cloud Non Master only daemon threads");
         super.startNonMasterDaemonThreads();
-        cloudInstanceStatusChecker.start();
+        cloudSyncVersionDaemon.start();
     }
 
     public static String genFeNodeNameFromMeta(String host, int port, long 
timeMs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
new file mode 100644
index 00000000000..f843b8b58e7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TCloudVersionInfo;
+import org.apache.doris.thrift.TFrontendSyncCloudVersionRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class CloudFEVersionSynchronizer {
+    private static final Logger LOG = 
LogManager.getLogger(CloudFEVersionSynchronizer.class);
+
+    private static final ExecutorService SYNC_VERSION_THREAD_POOL = 
Executors.newFixedThreadPool(
+            Config.cloud_sync_version_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("sync-version-%d").setDaemon(true).build());
+
+    public CloudFEVersionSynchronizer() {
+    }
+
+    // master FE send sync version rpc to other FEs
+    public void pushVersionAsync(long dbId, OlapTable table, long version) {
+        if (!Config.cloud_enable_version_syncer) {
+            return;
+        }
+        pushVersionAsync(dbId, Collections.singletonList(Pair.of(table, 
version)), Collections.emptyMap());
+    }
+
+    public void pushVersionAsync(long dbId, List<Pair<OlapTable, Long>> 
tableVersions,
+            Map<CloudPartition, Pair<Long, Long>> partitionVersionMap) {
+        if (!Config.cloud_enable_version_syncer) {
+            return;
+        }
+        if (tableVersions.isEmpty() && partitionVersionMap.isEmpty()) {
+            return;
+        }
+        pushVersion(dbId, tableVersions, partitionVersionMap);
+    }
+
+    private void pushVersion(long dbId, List<Pair<OlapTable, Long>> 
tableVersions,
+            Map<CloudPartition, Pair<Long, Long>> partitionVersionMap) {
+        List<Frontend> frontends = getFrontends();
+        if (frontends == null || frontends.isEmpty()) {
+            return;
+        }
+
+        List<TCloudVersionInfo> tableVersionInfos = new 
ArrayList<>(tableVersions.size());
+        tableVersions.forEach(pair -> {
+            TCloudVersionInfo tableVersion = new TCloudVersionInfo();
+            tableVersion.setTableId(pair.first.getId());
+            tableVersion.setVersion(pair.second);
+            tableVersionInfos.add(tableVersion);
+        });
+        List<TCloudVersionInfo> partitionVersionInfos = new 
ArrayList<>(partitionVersionMap.size());
+        partitionVersionMap.forEach((partition, versionPair) -> {
+            TCloudVersionInfo partitionVersion = new TCloudVersionInfo();
+            partitionVersion.setTableId(partition.getTableId());
+            partitionVersion.setPartitionId(partition.getId());
+            partitionVersion.setVersion(versionPair.first);
+            partitionVersion.setVersionUpdateTime(versionPair.second);
+            partitionVersionInfos.add(partitionVersion);
+        });
+        TFrontendSyncCloudVersionRequest request = new 
TFrontendSyncCloudVersionRequest();
+        request.setDbId(dbId);
+        request.setTableVersionInfos(tableVersionInfos);
+        request.setPartitionVersionInfos(partitionVersionInfos);
+        for (Frontend fe : frontends) {
+            SYNC_VERSION_THREAD_POOL.submit(() -> {
+                try {
+                    pushVersionToFe(request, fe);
+                } catch (Exception e) {
+                    LOG.warn("push cloud version error", e);
+                }
+            });
+        }
+    }
+
+    private List<Frontend> getFrontends() {
+        HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+        return Env.getCurrentEnv().getFrontends(null).stream()
+                .filter(fe -> fe.isAlive() && 
!(fe.getHost().equals(selfNode.getHost())
+                        && fe.getEditLogPort() == selfNode.getPort())).collect(
+                        Collectors.toList());
+    }
+
+    private void pushVersionToFe(TFrontendSyncCloudVersionRequest request, 
Frontend fe) {
+        FrontendService.Client client = null;
+        TNetworkAddress addr = new TNetworkAddress(fe.getHost(), 
fe.getRpcPort());
+        boolean ok = false;
+        try {
+            client = ClientPool.frontendVersionPool.borrowObject(addr);
+            TStatus status = client.syncCloudVersion(request);
+            ok = true;
+            if (status.getStatusCode() != TStatusCode.OK) {
+                LOG.warn("failed to push cloud version to frontend {}:{}, err: 
{}", fe.getHost(), fe.getRpcPort(),
+                        status.getErrorMsgs());
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to push cloud version to frontend {}:{}", 
fe.getHost(), fe.getRpcPort(), e);
+        } finally {
+            if (ok) {
+                ClientPool.frontendVersionPool.returnObject(addr, client);
+            } else {
+                ClientPool.frontendVersionPool.invalidateObject(addr, client);
+            }
+        }
+    }
+
+    // follower and observer FE receive sync version rpc from master FE
+    public void syncVersionAsync(TFrontendSyncCloudVersionRequest request) {
+        Database db = 
Env.getCurrentInternalCatalog().getDbNullable(request.getDbId());
+        if (db == null) {
+            return;
+        }
+        // only update table version
+        if (request.getPartitionVersionInfos().isEmpty()) {
+            request.getTableVersionInfos().forEach(tableVersionInfo -> {
+                Table table = 
db.getTableNullable(tableVersionInfo.getTableId());
+                if (table == null || !table.isManagedTable()) {
+                    return;
+                }
+                OlapTable olapTable = (OlapTable) table;
+                olapTable.setCachedTableVersion(tableVersionInfo.getVersion());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Update tableId: {}, version: {}", 
olapTable.getId(), tableVersionInfo.getVersion());
+                }
+            });
+            return;
+        }
+        // update partition and table version
+        SYNC_VERSION_THREAD_POOL.submit(() -> {
+            syncVersion(db, request);
+        });
+    }
+
+    private void syncVersion(Database db, TFrontendSyncCloudVersionRequest 
request) {
+        List<Pair<OlapTable, Long>> tableVersions = new 
ArrayList<>(request.getTableVersionInfos().size());
+        for (TCloudVersionInfo tableVersionInfo : 
request.getTableVersionInfos()) {
+            Table table = db.getTableNullable(tableVersionInfo.getTableId());
+            if (table == null || !table.isManagedTable()) {
+                continue;
+            }
+            tableVersions.add(Pair.of((OlapTable) table, 
tableVersionInfo.getVersion()));
+        }
+        Collections.sort(tableVersions, Comparator.comparingLong(o -> 
o.first.getId()));
+        for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+            tableVersion.first.versionWriteLock();
+        }
+        try {
+            for (TCloudVersionInfo partitionVersionInfo : 
request.getPartitionVersionInfos()) {
+                Table table = 
db.getTableNullable(partitionVersionInfo.getTableId());
+                if (table == null || !table.isManagedTable()) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) table;
+                Partition partition = 
olapTable.getPartition(partitionVersionInfo.getPartitionId());
+                if (partition == null) {
+                    continue;
+                }
+                CloudPartition cloudPartition = (CloudPartition) partition;
+                
cloudPartition.setCachedVisibleVersion(partitionVersionInfo.getVersion(),
+                        partitionVersionInfo.getVersionUpdateTime());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Update tableId: {}, partitionId: {}, version: 
{}, updateTime: {}",
+                            partitionVersionInfo.getTableId(), 
partition.getId(), partitionVersionInfo.getVersion(),
+                            partitionVersionInfo.getVersionUpdateTime());
+                }
+            }
+            for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+                tableVersion.first.setCachedTableVersion(tableVersion.second);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Update tableId: {}, version: {}", 
tableVersion.first.getId(), tableVersion.second);
+                }
+            }
+        } finally {
+            for (int i = tableVersions.size() - 1; i >= 0; i--) {
+                tableVersions.get(i).first.versionWriteUnlock();
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index 459526e68de..d518721fabe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -17,10 +17,13 @@
 
 package org.apache.doris.cloud.catalog;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.rpc.VersionHelper;
@@ -41,7 +44,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -144,7 +150,7 @@ public class CloudPartition extends Partition {
         return getVisibleVersionFromMs(false);
     }
 
-    public long getVisibleVersionFromMs(boolean waitForPendingTxns) {
+    private long getVisibleVersionFromMs(boolean waitForPendingTxns) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getVisibleVersionFromMs use CloudPartition {}, 
waitForPendingTxns: {}",
                     super.getName(), waitForPendingTxns);
@@ -255,6 +261,27 @@ public class CloudPartition extends Partition {
         return versions;
     }
 
+    private static List<OlapTable> getTables(List<CloudPartition> partitions) {
+        Map<Long, OlapTable> tableMap = new HashMap<>();
+        for (CloudPartition partition : partitions) {
+            if (tableMap.containsKey(partition.getTableId())) {
+                continue;
+            }
+            Database db = 
Env.getCurrentInternalCatalog().getDbNullable(partition.getDbId());
+            if (db == null) {
+                continue;
+            }
+            Table table = db.getTableNullable(partition.getTableId());
+            if (table == null) {
+                continue;
+            }
+            tableMap.put(partition.getTableId(), (OlapTable) table);
+        }
+        List<OlapTable> tables = 
tableMap.values().stream().collect(Collectors.toCollection(ArrayList::new));
+        Collections.sort(tables, Comparator.comparingLong(o -> o.getId()));
+        return tables;
+    }
+
     // Get visible version from the specified partitions;
     //
     // Return the visible version in order of the specified partition ids, -1 
means version NOT FOUND.
@@ -272,15 +299,24 @@ public class CloudPartition extends Partition {
         // partitionId -> cachedVersion
         List<Pair<Long, Long>> allVersions = new 
ArrayList<>(partitions.size());
         List<CloudPartition> expiredPartitions = new 
ArrayList<>(partitions.size());
-        for (CloudPartition partition : partitions) {
-            long ver = partition.getCachedVisibleVersion();
-            if (partition.isCachedVersionExpired()) {
-                expiredPartitions.add(partition);
-                ver = 0L; // 0 means to be get from meta-service
+        List<OlapTable> tables = getTables(partitions);
+        for (OlapTable table : tables) {
+            table.versionReadLock();
+        }
+        try {
+            for (CloudPartition partition : partitions) {
+                long ver = partition.getCachedVisibleVersion();
+                if (partition.isCachedVersionExpired()) {
+                    expiredPartitions.add(partition);
+                    ver = 0L; // 0 means to be get from meta-service
+                }
+                allVersions.add(Pair.of(partition.getId(), ver));
+            }
+        } finally {
+            for (int i = tables.size() - 1; i >= 0; i--) {
+                tables.get(i).versionReadUnlock();
             }
-            allVersions.add(Pair.of(partition.getId(), ver));
         }
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={}, 
numFilteredPartitions={}",
                     cloudPartitionVersionCacheTtlMs, partitions.size(), 
partitions.size() - expiredPartitions.size());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
new file mode 100644
index 00000000000..d65a6806eac
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+public class CloudSyncVersionDaemon extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CloudSyncVersionDaemon.class);
+    private static final ExecutorService GET_VERSION_THREAD_POOL = 
Executors.newFixedThreadPool(
+            Config.cloud_get_version_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("get-version-%d").setDaemon(true).build());
+
+    public CloudSyncVersionDaemon() {
+        super("cloud table and partition version checker",
+                Config.cloud_version_syncer_interval_second * 1000);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!Config.cloud_enable_version_syncer) {
+            return;
+        }
+        LOG.info("begin sync cloud table and partition version");
+        Map<OlapTable, Long> tableVersionMap = syncTableVersions();
+        if (!tableVersionMap.isEmpty()) {
+            syncPartitionVersion(tableVersionMap);
+        }
+    }
+
+    private Map<OlapTable, Long> syncTableVersions() {
+        Map<OlapTable, Long> tableVersionMap = new ConcurrentHashMap<>();
+        List<Future<Void>> futures = new ArrayList<>();
+        long start = System.currentTimeMillis();
+        List<Long> dbIds = new ArrayList<>();
+        List<Long> tableIds = new ArrayList<>();
+        List<OlapTable> tables = new ArrayList<>();
+        // TODO meta service support range scan all table versions
+        for (Database db : Env.getCurrentInternalCatalog().getDbs()) {
+            List<Table> tableList = db.getTables();
+            for (Table table : tableList) {
+                if (!table.isManagedTable()) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) table;
+                if (!olapTable.isCachedTableVersionExpired(
+                        Config.cloud_version_syncer_interval_second * 1000)) {
+                    continue;
+                }
+                dbIds.add(db.getId());
+                tableIds.add(olapTable.getId());
+                tables.add(olapTable);
+                if (dbIds.size() >= Config.cloud_get_version_task_batch_size) {
+                    Future<Void> future = 
submitGetTableVersionTask(tableVersionMap, ImmutableList.copyOf(dbIds),
+                            ImmutableList.copyOf(tableIds), 
ImmutableList.copyOf(tables));
+                    futures.add(future);
+                    dbIds.clear();
+                    tableIds.clear();
+                    tables.clear();
+                }
+            }
+        }
+        if (!dbIds.isEmpty()) {
+            Future<Void> future = submitGetTableVersionTask(tableVersionMap, 
ImmutableList.copyOf(dbIds),
+                    ImmutableList.copyOf(tableIds), 
ImmutableList.copyOf(tables));
+            futures.add(future);
+            dbIds.clear();
+            tableIds.clear();
+            tables.clear();
+        }
+        try {
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error waiting for get table version tasks to complete", 
e);
+        }
+        LOG.info("sync table version cost {} ms, rpc size: {}, found {} tables 
need to sync partition version",
+                System.currentTimeMillis() - start, futures.size(), 
tableVersionMap.size());
+        return tableVersionMap;
+    }
+
+    private Future<Void> submitGetTableVersionTask(Map<OlapTable, Long> 
tableVersionMap, List<Long> dbIds,
+            List<Long> tableIds, List<OlapTable> tables) {
+        return GET_VERSION_THREAD_POOL.submit(() -> {
+            try {
+                List<Long> versions = 
OlapTable.getVisibleVersionFromMeta(dbIds, tableIds);
+                for (int i = 0; i < tables.size(); i++) {
+                    OlapTable table = tables.get(i);
+                    long version = versions.get(i);
+                    if (version > table.getCachedTableVersion()) {
+                        tableVersionMap.compute(table, (k, v) -> {
+                            if (v == null || version > v) {
+                                return version;
+                            } else {
+                                return v;
+                            }
+                        });
+                    } else {
+                        // update lastTableVersionCachedTimeMs
+                        table.setCachedTableVersion(version);
+                    }
+                }
+            } catch (Exception e) {
+                LOG.warn("get table version error", e);
+            }
+            return null;
+        });
+    }
+
+    private void syncPartitionVersion(Map<OlapTable, Long> tableVersionMap) {
+        Set<Long> failedTables = ConcurrentHashMap.newKeySet();
+        List<Future<Void>> futures = new ArrayList<>();
+        long start = System.currentTimeMillis();
+        List<CloudPartition> partitions = new ArrayList<>();
+        // TODO meta service support range scan partition versions
+        for (Entry<OlapTable, Long> entry : tableVersionMap.entrySet()) {
+            OlapTable olapTable = entry.getKey();
+            LOG.info("sync partition version for db: {}, table: {}, table 
cache version: {}, new version: {}",
+                    olapTable.getDatabase().getId(), olapTable, 
olapTable.getCachedTableVersion(), entry.getValue());
+            for (Partition partition : olapTable.getAllPartitions()) {
+                partitions.add((CloudPartition) partition);
+                if (partitions.size() >= 
Config.cloud_get_version_task_batch_size) {
+                    Future<Void> future = 
submitGetPartitionVersionTask(failedTables, ImmutableList.copyOf(partitions));
+                    futures.add(future);
+                    partitions.clear();
+                }
+            }
+        }
+        if (partitions.size() > 0) {
+            Future<Void> future = submitGetPartitionVersionTask(failedTables, 
ImmutableList.copyOf(partitions));
+            futures.add(future);
+            partitions.clear();
+        }
+        try {
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error waiting for get partition version tasks to 
complete", e);
+        }
+        // set table version for success tables
+        for (Entry<OlapTable, Long> entry : tableVersionMap.entrySet()) {
+            if (!failedTables.contains(entry.getKey().getId())) {
+                OlapTable olapTable = entry.getKey();
+                olapTable.setCachedTableVersion(entry.getValue());
+            }
+        }
+        LOG.info("sync partition version cost {} ms, table size: {}, rpc size: 
{}, failed tables: {}",
+                System.currentTimeMillis() - start, tableVersionMap.size(), 
futures.size(), failedTables);
+    }
+
+    private Future<Void> submitGetPartitionVersionTask(Set<Long> failedTables, 
List<CloudPartition> partitions) {
+        return GET_VERSION_THREAD_POOL.submit(() -> {
+            try {
+                CloudPartition.getSnapshotVisibleVersionFromMs(partitions, 
false);
+            } catch (Exception e) {
+                LOG.warn("get partition version error", e);
+                Set<Long> failedTableIds = partitions.stream().map(p -> 
p.getTableId())
+                        .collect(Collectors.toSet());
+                failedTables.addAll(failedTableIds);
+            }
+            return null;
+        });
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 043a10258ea..f8e6573bdc3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -37,6 +37,7 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cloud.catalog.CloudEnv;
@@ -484,12 +485,22 @@ public class CloudInternalCatalog extends InternalCatalog 
{
      */
     @Override
     public void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
-            boolean isCreateTable, boolean isBatchCommit)
+            boolean isCreateTable, boolean isBatchCommit, OlapTable olapTable)
             throws DdlException {
         if (isBatchCommit) {
-            commitMaterializedIndex(dbId, tableId, indexIds, partitionIds, 
isCreateTable);
+            long tableVersion = commitMaterializedIndex(dbId, tableId, 
indexIds, partitionIds, isCreateTable);
+            if (olapTable != null && isCreateTable && tableVersion > 0) {
+                olapTable.setCachedTableVersion(tableVersion);
+                ((CloudEnv) 
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+                        .pushVersionAsync(dbId, olapTable, tableVersion);
+            }
         } else {
-            commitPartition(dbId, tableId, partitionIds, indexIds);
+            long tableVersion = commitPartition(dbId, tableId, partitionIds, 
indexIds);
+            if (olapTable != null && tableVersion > 0) {
+                olapTable.setCachedTableVersion(tableVersion);
+                ((CloudEnv) 
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+                        .pushVersionAsync(dbId, olapTable, tableVersion);
+            }
         }
         if (!Config.check_create_table_recycle_key_remained) {
             return;
@@ -554,11 +565,14 @@ public class CloudInternalCatalog extends InternalCatalog 
{
         }
     }
 
-    public void commitPartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+    /**
+     * @return table version if returned by MetaService, otherwise return 0
+     */
+    public long commitPartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
             throws DdlException {
         if (Config.enable_check_compatibility_mode) {
             LOG.info("skip committing partitions in check compatibility mode");
-            return;
+            return 0;
         }
 
         Cloud.PartitionRequest.Builder partitionRequestBuilder = 
Cloud.PartitionRequest.newBuilder()
@@ -593,6 +607,10 @@ public class CloudInternalCatalog extends InternalCatalog {
             LOG.warn("commitPartition response: {} ", response);
             throw new DdlException(response.getStatus().getMsg());
         }
+        if (response.hasTableVersion()) {
+            return response.getTableVersion();
+        }
+        return 0;
     }
 
     // if `expiration` = 0, recycler will delete uncommitted indexes in 
`retention_seconds`
@@ -633,12 +651,15 @@ public class CloudInternalCatalog extends InternalCatalog 
{
         }
     }
 
-    public void commitMaterializedIndex(long dbId, long tableId, List<Long> 
indexIds, List<Long> partitionIds,
+    /**
+     * @return table version if returned by MetaService, otherwise return 0
+     */
+    public long commitMaterializedIndex(long dbId, long tableId, List<Long> 
indexIds, List<Long> partitionIds,
             boolean isCreateTable)
             throws DdlException {
         if (Config.enable_check_compatibility_mode) {
             LOG.info("skip committing materialized index in checking 
compatibility mode");
-            return;
+            return 0;
         }
 
         Cloud.IndexRequest.Builder indexRequestBuilder = 
Cloud.IndexRequest.newBuilder()
@@ -674,6 +695,10 @@ public class CloudInternalCatalog extends InternalCatalog {
             LOG.warn("commitIndex response: {} ", response);
             throw new DdlException(response.getStatus().getMsg());
         }
+        if (isCreateTable && response.hasTableVersion()) {
+            return response.getTableVersion();
+        }
+        return 0;
     }
 
     private void checkPartition(long dbId, long tableId, List<Long> 
partitionIds)
@@ -925,6 +950,19 @@ public class CloudInternalCatalog extends InternalCatalog {
         if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
             LOG.warn("dropPartition response: {} ", response);
             throw new DdlException(response.getStatus().getMsg());
+        } else if (needUpdateTableVersion && response.hasTableVersion() && 
response.getTableVersion() > 0) {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                return;
+            }
+            Table table = db.getTableNullable(tableId);
+            if (table != null && table instanceof OlapTable) {
+                OlapTable olapTable = (OlapTable) table;
+                long tableVersion = response.getTableVersion();
+                olapTable.setCachedTableVersion(tableVersion);
+                ((CloudEnv) 
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+                        .pushVersionAsync(dbId, olapTable, tableVersion);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 3ada132ad63..4fe2e9cc61a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
@@ -142,6 +143,7 @@ import java.io.IOException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -505,44 +507,17 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         // 1. update rowCountfor AnalysisManager
         Map<Long, Long> updatedRows = new HashMap<>();
         for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) {
-            LOG.info("Update RowCount for AnalysisManager. transactionId:{}, 
table_id:{}, updated_row_count:{}",
-                    txnId, tableStats.getTableId(), 
tableStats.getUpdatedRowCount());
-            updatedRows.put(tableStats.getTableId(), 
tableStats.getUpdatedRowCount());
+            if (tableStats.hasUpdatedRowCount()) {
+                LOG.info("Update RowCount for AnalysisManager. 
transactionId:{}, table_id:{}, updated_row_count:{}",
+                        txnId, tableStats.getTableId(), 
tableStats.getUpdatedRowCount());
+                updatedRows.put(tableStats.getTableId(), 
tableStats.getUpdatedRowCount());
+            }
         }
         Env env = Env.getCurrentEnv();
         env.getAnalysisManager().updateUpdatedRows(updatedRows);
-        // 2. notify partition first load
-        int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size();
-        // a map to record <tableId, [firstLoadPartitionIds]>
-        Map<Long, List<Long>> tablePartitionMap = Maps.newHashMap();
-        for (int idx = 0; idx < totalPartitionNum; ++idx) {
-            long version = commitTxnResponse.getVersions(idx);
-            long tableId = commitTxnResponse.getTableIds(idx);
-            if (version == 2) {
-                // inform AnalysisManager first load partitions
-                tablePartitionMap.computeIfAbsent(tableId, k -> 
Lists.newArrayList());
-                
tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx));
-            }
-            // 3. update CloudPartition
-            OlapTable olapTable = (OlapTable) 
env.getInternalCatalog().getDb(dbId)
-                    .flatMap(db -> db.getTable(tableId)).filter(t -> 
t.isManagedTable())
-                    .orElse(null);
-            if (olapTable == null) {
-                continue;
-            }
-            CloudPartition partition = (CloudPartition) olapTable.getPartition(
-                    commitTxnResponse.getPartitionIds(idx));
-            if (partition == null) {
-                continue;
-            }
-            if (version == 2) {
-                
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
-                    .stream().forEach(i -> i.setRowCountReported(false));
-            }
-            partition.setCachedVisibleVersion(version, 
commitTxnResponse.getVersionUpdateTimeMs());
-            LOG.info("Update Partition. transactionId:{}, table_id:{}, 
partition_id:{}, version:{}, update time:{}",
-                    txnId, tableId, partition.getId(), version, 
commitTxnResponse.getVersionUpdateTimeMs());
-        }
+        // 2. update table and partition version
+        Map<Long, List<Long>> tablePartitionMap = 
updateVersion(commitTxnResponse);
+        // 3. notify partition first load
         env.getAnalysisManager().setNewPartitionLoaded(
                 
tablePartitionMap.keySet().stream().collect(Collectors.toList()));
         // tablePartitionMap to string
@@ -578,6 +553,83 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
+    private Map<Long, List<Long>> updateVersion(CommitTxnResponse 
commitTxnResponse) {
+        long dbId = commitTxnResponse.getTxnInfo().getDbId();
+        long txnId = commitTxnResponse.getTxnInfo().getTxnId();
+        int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size();
+        if (totalPartitionNum == 0 && 
commitTxnResponse.getTableStatsList().isEmpty()) {
+            return Collections.emptyMap();
+        }
+        Env env = Env.getCurrentEnv();
+        // partition -> <version, versionUpdateTime>
+        Map<CloudPartition, Pair<Long, Long>> partitionVersionMap = new 
HashMap<>();
+        // a map to record <tableId, [firstLoadPartitionIds]>
+        Map<Long, List<Long>> tablePartitionMap = Maps.newHashMap();
+        for (int idx = 0; idx < totalPartitionNum; ++idx) {
+            long version = commitTxnResponse.getVersions(idx);
+            long tableId = commitTxnResponse.getTableIds(idx);
+            if (version == 2) {
+                // inform AnalysisManager first load partitions
+                tablePartitionMap.computeIfAbsent(tableId, k -> 
Lists.newArrayList());
+                
tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx));
+            }
+            // 3. update CloudPartition
+            OlapTable olapTable = (OlapTable) 
env.getInternalCatalog().getDb(dbId)
+                    .flatMap(db -> db.getTable(tableId)).filter(t -> 
t.isManagedTable())
+                    .orElse(null);
+            if (olapTable == null) {
+                continue;
+            }
+            CloudPartition partition = (CloudPartition) olapTable.getPartition(
+                    commitTxnResponse.getPartitionIds(idx));
+            if (partition == null) {
+                continue;
+            }
+            if (version == 2) {
+                
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
+                        .stream().forEach(i -> i.setRowCountReported(false));
+            }
+            partitionVersionMap.put(partition, Pair.of(version, 
commitTxnResponse.getVersionUpdateTimeMs()));
+        }
+        // collect table versions
+        Database db = env.getInternalCatalog().getDb(dbId).get();
+        List<Pair<OlapTable, Long>> tableVersions = new 
ArrayList<>(commitTxnResponse.getTableStatsList().size());
+        for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) {
+            if (!tableStats.hasTableVersion()) {
+                continue;
+            }
+            Table table = db.getTableNullable(tableStats.getTableId());
+            if (table == null || !table.isManagedTable()) {
+                continue;
+            }
+            tableVersions.add(Pair.of((OlapTable) table, 
tableStats.getTableVersion()));
+        }
+        Collections.sort(tableVersions, Comparator.comparingLong(o -> 
o.first.getId()));
+        // update partition version and table version
+        for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+            tableVersion.first.versionWriteLock();
+        }
+        try {
+            partitionVersionMap.forEach((partition, versionPair) -> {
+                partition.setCachedVisibleVersion(versionPair.first, 
versionPair.second);
+                LOG.info("Update Partition. transactionId:{}, table_id:{}, 
partition_id:{}, version:{}, update time:{}",
+                        txnId, partition.getTableId(), partition.getId(), 
versionPair.first, versionPair.second);
+            });
+            for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+                tableVersion.first.setCachedTableVersion(tableVersion.second);
+                LOG.info("Update Table. transactionId:{}, table_id:{}, 
version:{}", txnId, tableVersion.first.getId(),
+                        tableVersion.second);
+            }
+        } finally {
+            for (int i = tableVersions.size() - 1; i >= 0; i--) {
+                tableVersions.get(i).first.versionWriteUnlock();
+            }
+        }
+        // notify follower and observer FE to update their version cache
+        ((CloudEnv) 
env).getCloudFEVersionSynchronizer().pushVersionAsync(dbId, tableVersions, 
partitionVersionMap);
+        return tablePartitionMap;
+    }
+
     private Set<Long> getBaseTabletsFromTables(List<Table> tableList, 
List<TabletCommitInfo> tabletCommitInfos)
             throws MetaNotFoundException {
         Set<Long> baseTabletIds = Sets.newHashSet();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index bdfffbe4802..2b769ae2e62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -66,6 +66,9 @@ public class ClientPool {
     public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
             new GenericPool<>("FrontendService", heartbeatConfig, 
heartbeatTimeoutMs,
                     
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
+    public static GenericPool<FrontendService.Client> frontendVersionPool =
+            new GenericPool<>("FrontendService", heartbeatConfig, 
heartbeatTimeoutMs,
+                    
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<FrontendService.Client> frontendPool =
             new GenericPool("FrontendService", backendConfig, 
Config.backend_rpc_timeout_ms,
                     
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 3ac6eb65e0e..c2dee8d62fc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
@@ -37,7 +38,10 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
@@ -70,6 +74,8 @@ import org.apache.doris.nereids.types.DateTimeType;
 import org.apache.doris.nereids.types.DateTimeV2Type;
 import org.apache.doris.nereids.types.DateV2Type;
 import org.apache.doris.nereids.types.coercion.DateLikeType;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
@@ -79,10 +85,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -94,6 +103,8 @@ import java.util.stream.Collectors;
  * show [temp] partitions' detail info within a table
  */
 public class PartitionsProcDir implements ProcDirInterface {
+    private static final Logger LOG = 
LogManager.getLogger(PartitionsProcDir.class);
+
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
             .add("PartitionId").add("PartitionName")
             .add("VisibleVersion").add("VisibleVersionTime")
@@ -397,6 +408,60 @@ public class PartitionsProcDir implements ProcDirInterface 
{
         return partitionInfosInrernal.stream().map(pair -> 
pair.second).collect(Collectors.toList());
     }
 
+    private List<Long> getPartitionVersions(OlapTable olapTable, List<Long> 
partitionIds)
+            throws AnalysisException {
+        List<Long> partitionVersions;
+        if (Config.isNotCloudMode()) {
+            partitionVersions = partitionIds.stream().map(id -> 
olapTable.getPartition(id).getVisibleVersion())
+                    .collect(Collectors.toList());
+        } else if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().cloudForceSyncVersion) {
+            LOG.info("cloud force sync version for table: {}, partitionNum: 
{}", olapTable, partitionIds.size());
+            long dbId = olapTable.getDatabase().getId();
+            // sync table version
+            // Note: does not update table version cache to avoid that when 
getting partition version fails,
+            // the table version cache is updated but partition version cache 
is not updated.
+            List<Long> tableVersions = 
OlapTable.getVisibleVersionFromMeta(Lists.newArrayList(dbId),
+                    Lists.newArrayList(olapTable.getId()));
+            List<Pair<OlapTable, Long>> tableVersionMap = 
Lists.newArrayList(Pair.of(olapTable, tableVersions.get(0)));
+            // sync partition version
+            List<CloudPartition> partitions = partitionIds.stream()
+                    .map(id -> (CloudPartition) 
(olapTable.getPartition(id))).collect(Collectors.toList());
+            try {
+                partitionVersions = new ArrayList<>(partitionIds.size());
+                int batchSize = Config.cloud_get_version_task_batch_size;
+                for (int start = 0; start < partitions.size(); start += 
batchSize) {
+                    int end = Math.min(start + batchSize, partitions.size());
+                    List<CloudPartition> batch = partitions.subList(start, 
end);
+                    
partitionVersions.addAll(CloudPartition.getSnapshotVisibleVersionFromMs(batch, 
false));
+                }
+            } catch (RpcException e) {
+                LOG.warn("get partition versions failed for table: {}", 
olapTable, e);
+                throw new AnalysisException("get partition versions failed", 
e);
+            }
+            Map<CloudPartition, Pair<Long, Long>> partitionVersionMap = new 
HashMap<>(partitionIds.size());
+            for (int i = 0; i < partitionIds.size(); i++) {
+                CloudPartition partition = partitions.get(i);
+                long version = partitionVersions.get(i);
+                partitionVersionMap.put(partition, Pair.of(version, 
partition.getVisibleVersionTime()));
+            }
+            // push to other fes
+            ((CloudEnv) (Env.getCurrentEnv())).getCloudFEVersionSynchronizer()
+                    .pushVersionAsync(dbId, tableVersionMap, 
partitionVersionMap);
+        } else {
+            List<CloudPartition> partitions = partitionIds.stream()
+                    .map(id -> (CloudPartition) 
(olapTable.getPartition(id))).collect(Collectors.toList());
+            try {
+                partitionVersions = 
CloudPartition.getSnapshotVisibleVersion(partitions);
+            } catch (RpcException e) {
+                LOG.warn("get partition versions failed for table: {}", 
olapTable, e);
+                throw new AnalysisException("get partition versions failed", 
e);
+            }
+        }
+        Preconditions.checkState(partitionVersions.size() == 
partitionIds.size(),
+                "versions size %s not equal partition size %s", 
partitionVersions.size(), partitionIds.size());
+        return partitionVersions;
+    }
+
     private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() 
throws AnalysisException {
         Preconditions.checkNotNull(db);
         Preconditions.checkNotNull(olapTable);
@@ -446,8 +511,10 @@ public class PartitionsProcDir implements ProcDirInterface 
{
                 partitionIds = 
partitions.stream().map(Partition::getId).collect(Collectors.toList());
             }
 
+            List<Long> partitionVersions = getPartitionVersions(olapTable, 
partitionIds);
             Joiner joiner = Joiner.on(", ");
-            for (Long partitionId : partitionIds) {
+            for (int j = 0; j < partitionIds.size(); j++) {
+                long partitionId = partitionIds.get(j);
                 Partition partition = olapTable.getPartition(partitionId);
 
                 List<Comparable> partitionInfo = new ArrayList<Comparable>();
@@ -457,8 +524,9 @@ public class PartitionsProcDir implements ProcDirInterface {
                 trow.addToColumnValue(new TCell().setLongVal(partitionId));
                 partitionInfo.add(partitionName);
                 trow.addToColumnValue(new TCell().setStringVal(partitionName));
-                partitionInfo.add(partition.getVisibleVersion());
-                trow.addToColumnValue(new 
TCell().setLongVal(partition.getVisibleVersion()));
+                long partitionVersion = partitionVersions.get(j);
+                partitionInfo.add(partitionVersion);
+                trow.addToColumnValue(new 
TCell().setLongVal(partitionVersion));
                 String visibleTime = 
TimeUtils.longToTimeString(partition.getVisibleVersionTime());
                 partitionInfo.add(visibleTime);
                 trow.addToColumnValue(new TCell().setStringVal(visibleTime));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 06ab6df903a..8bbd6268e20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1780,7 +1780,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
                 if (!isCreateTable) {
                     afterCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds, isCreateTable,
-                            false /* isBatchCommit */);
+                            false /* isBatchCommit */, olapTable);
                 }
                 if (writeEditLog) {
                     Env.getCurrentEnv().getEditLog().logAddPartition(info);
@@ -2223,6 +2223,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
     public void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
                                          boolean isCreateTable, boolean 
isBatchCommit)
             throws DdlException {
+        afterCreatePartitions(dbId, tableId, partitionIds, indexIds, 
isCreateTable, isBatchCommit, null);
+    }
+
+    public void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+            boolean isCreateTable, boolean isBatchCommit, OlapTable olapTable) 
throws DdlException {
     }
 
     public void checkAvailableCapacity(Database db) throws DdlException {
@@ -3004,7 +3009,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         binlogConfigForTask,
                         
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
                 afterCreatePartitions(db.getId(), olapTable.getId(), 
olapTable.getPartitionIds(),
-                                olapTable.getIndexIdList(), true /* 
isCreateTable */, true /* isBatchCommit */);
+                        olapTable.getIndexIdList(), true /* isCreateTable */, 
true /* isBatchCommit */, olapTable);
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE
                     || partitionInfo.getType() == PartitionType.LIST) {
@@ -3098,7 +3103,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         .setStoragePolicy(partionStoragePolicy);
                 }
                 afterCreatePartitions(db.getId(), olapTable.getId(), 
olapTable.getPartitionIds(),
-                        olapTable.getIndexIdList(), true /* isCreateTable */, 
true /* isBatchCommit */);
+                        olapTable.getIndexIdList(), true /* isCreateTable */, 
true /* isBatchCommit */, olapTable);
             } else {
                 throw new DdlException("Unsupported partition method: " + 
partitionInfo.getType().name());
             }
@@ -3560,7 +3565,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
 
             afterCreatePartitions(db.getId(), copiedTbl.getId(), 
newPartitionIds, indexIds, true /* isCreateTable */,
-                    false /* isBatchCommit */);
+                    false /* isBatchCommit */, olapTable);
 
         } catch (DdlException e) {
             // create partition failed, remove all newly created tablets
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3fc7d3a7634..4e96a70afe7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -802,6 +802,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String CLOUD_CLUSTER = "cloud_cluster";
     public static final String COMPUTE_GROUP = "compute_group";
     public static final String DISABLE_EMPTY_PARTITION_PRUNE = 
"disable_empty_partition_prune";
+    public static final String CLOUD_FORCE_SYNC_VERSION = 
"cloud_force_sync_version";
     public static final String CLOUD_PARTITION_VERSION_CACHE_TTL_MS =
             "cloud_partition_version_cache_ttl_ms";
     public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
@@ -3040,9 +3041,11 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
     public boolean disableEmptyPartitionPrune = false;
     @VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
-    public long cloudPartitionVersionCacheTtlMs = 0;
+    public long cloudPartitionVersionCacheTtlMs = Long.MAX_VALUE;
     @VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
-    public long cloudTableVersionCacheTtlMs = 0;
+    public long cloudTableVersionCacheTtlMs = Long.MAX_VALUE;
+    @VariableMgr.VarAttr(name = CLOUD_FORCE_SYNC_VERSION)
+    public boolean cloudForceSyncVersion = false;
     // CLOUD_VARIABLES_END
 
     // fetch remote schema rpc timeout
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4110abe89ad..82fa9992d19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -181,6 +181,7 @@ import org.apache.doris.thrift.TFrontendPingFrontendResult;
 import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
 import org.apache.doris.thrift.TFrontendReportAliveSessionRequest;
 import org.apache.doris.thrift.TFrontendReportAliveSessionResult;
+import org.apache.doris.thrift.TFrontendSyncCloudVersionRequest;
 import org.apache.doris.thrift.TGetBackendMetaRequest;
 import org.apache.doris.thrift.TGetBackendMetaResult;
 import org.apache.doris.thrift.TGetBinlogLagResult;
@@ -3063,6 +3064,17 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    @Override
+    public TStatus syncCloudVersion(TFrontendSyncCloudVersionRequest request)
+            throws TException {
+        TStatus status = new TStatus(TStatusCode.OK);
+        if (Env.getCurrentEnv().isMaster()) {
+            return status;
+        }
+        ((CloudEnv) 
(Env.getCurrentEnv())).getCloudFEVersionSynchronizer().syncVersionAsync(request);
+        return status;
+    }
+
     @Override
     public TFetchSchemaTableDataResult 
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
         try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
index dda4acc832c..babe7107b29 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
@@ -45,9 +45,13 @@ import org.junit.Test;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class OlapTableTest {
 
@@ -405,4 +409,132 @@ public class OlapTableTest {
             ConnectContext.remove();
         }
     }
+
+    private OlapTable createCloudOlapTable(long tableId, Database db) {
+        OlapTable table = new OlapTable() {
+            private ReadWriteLock versionLock = new ReentrantReadWriteLock();
+
+            @Override
+            public Database getDatabase() {
+                return db;
+            }
+
+            @Override
+            public void versionReadLock() {
+                versionLock.readLock().lock();
+            }
+
+            @Override
+            public void versionReadUnlock() {
+                versionLock.readLock().unlock();
+            }
+        };
+        table.id = tableId;
+        return table;
+    }
+
+    @Test
+    public void testGetVisibleVersionInBatchCached() throws Exception {
+        new MockUp<Config>() {
+            @Mock
+            public boolean isNotCloudMode() {
+                return false;
+            }
+
+            @Mock
+            public boolean isCloudMode() {
+                return true;
+            }
+        };
+
+        final Database db = new Database(1L, "test_db");
+        List<OlapTable> tables = new ArrayList<>();
+        for (long i = 0; i < 3; i++) {
+            tables.add(createCloudOlapTable(100 + i, db));
+        }
+
+        final ArrayList<ArrayList<Long>> batchVersions = new 
ArrayList<>(Arrays.asList(
+                new ArrayList<>(Arrays.asList(10L, 20L, 30L)),
+                new ArrayList<>(Arrays.asList(11L, 21L, 31L)),
+                new ArrayList<>(Arrays.asList(22L, 32L)),
+                new ArrayList<>(Arrays.asList(13L, 23L, 33L))
+        ));
+        final int[] callCount = {0};
+
+        new MockUp<VersionHelper>() {
+            @Mock
+            public Cloud.GetVersionResponse 
getVersionFromMeta(Cloud.GetVersionRequest req) {
+                Cloud.GetVersionResponse.Builder builder = 
Cloud.GetVersionResponse.newBuilder();
+                builder.setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+                        .setCode(Cloud.MetaServiceCode.OK).build());
+                builder.addAllVersions(batchVersions.get(callCount[0]));
+                callCount[0]++;
+                return builder.build();
+            }
+        };
+
+        ConnectContext ctx = new ConnectContext();
+        ctx.setSessionVariable(new SessionVariable());
+        ctx.setThreadLocalInfo();
+
+        // CHECKSTYLE OFF
+        try {
+            // Test 1: cache disabled (TTL = -1), all fetched from MS
+            ctx.getSessionVariable().cloudTableVersionCacheTtlMs = -1;
+            {
+                List<Long> versions = 
OlapTable.getVisibleVersionInBatch(tables);
+                Assert.assertEquals(1, callCount[0]);
+                Assert.assertEquals(Arrays.asList(10L, 20L, 30L), versions);
+            }
+
+            // Test 2: cache enabled with long TTL, all should hit cache
+            ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 100000;
+            {
+                List<Long> versions = 
OlapTable.getVisibleVersionInBatch(tables);
+                Assert.assertEquals(1, callCount[0]);
+                Assert.assertEquals(Arrays.asList(10L, 20L, 30L), versions);
+            }
+
+            // Test 3: cache disabled (TTL = 0), all fetched from MS again
+            ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 0;
+            {
+                List<Long> versions = 
OlapTable.getVisibleVersionInBatch(tables);
+                Assert.assertEquals(2, callCount[0]);
+                Assert.assertEquals(Arrays.asList(11L, 21L, 31L), versions);
+            }
+
+            // Test 4: short TTL, wait for expiration, then partially refresh
+            ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 500;
+            Thread.sleep(550);
+
+            // refresh one table's cache so it stays hot
+            OlapTable hotTable = tables.get(0);
+            hotTable.setCachedTableVersion(hotTable.getCachedTableVersion());
+            Assert.assertFalse(hotTable.isCachedTableVersionExpired());
+            Assert.assertTrue(tables.get(1).isCachedTableVersionExpired());
+            Assert.assertTrue(tables.get(2).isCachedTableVersionExpired());
+            {
+                // batchVersions[2] = [22, 32] for the 2 expired tables
+                List<Long> versions = 
OlapTable.getVisibleVersionInBatch(tables);
+                Assert.assertEquals(3, callCount[0]);
+                Assert.assertEquals(3, versions.size());
+                // hot table keeps its cached version
+                Assert.assertEquals(11L, versions.get(0).longValue());
+                // expired tables get new versions from MS
+                Assert.assertEquals(22L, versions.get(1).longValue());
+                Assert.assertEquals(32L, versions.get(2).longValue());
+            }
+
+            // Test 5: all expired again, full batch fetch
+            ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 0;
+            {
+                List<Long> versions = 
OlapTable.getVisibleVersionInBatch(tables);
+                Assert.assertEquals(4, callCount[0]);
+                Assert.assertEquals(Arrays.asList(13L, 23L, 33L), versions);
+            }
+        } finally {
+            ConnectContext.remove();
+        }
+        // CHECKSTYLE ONca
+    }
 }
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1f61fcab17d..fd799965e37 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1012,12 +1012,13 @@ message SubTxnInfo {
 message TableStatsPB {
     optional int64 table_id = 1;
     optional int64 updated_row_count = 2;
+    optional int64 table_version = 3;
 }
 
 message CommitTxnResponse {
     optional MetaServiceResponseStatus status = 1;
     optional TxnInfoPB txn_info = 2;
-    // <tablet_id, partition_id> --> version
+    // <table_id, partition_id> --> version
     repeated int64 table_ids = 3;
     repeated int64 partition_ids = 4;
     repeated int64 versions = 5;
@@ -1391,6 +1392,7 @@ message IndexRequest {
 
 message IndexResponse {
     optional MetaServiceResponseStatus status = 1;
+    optional int64 table_version = 2;
 }
 
 message PartitionRequest {
@@ -1407,6 +1409,7 @@ message PartitionRequest {
 
 message PartitionResponse {
     optional MetaServiceResponseStatus status = 1;
+    optional int64 table_version = 2;
 }
 
 message RestoreJobRequest {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1c573c58f71..82a2d40454f 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -770,6 +770,19 @@ struct TFrontendPingFrontendRequest {
    3: optional string deployMode
 }
 
+struct TCloudVersionInfo {
+   1: optional i64 tableId
+   2: optional i64 partitionId
+   3: optional i64 version
+   4: optional i64 versionUpdateTime
+}
+
+struct TFrontendSyncCloudVersionRequest {
+   1: optional i64 dbId
+   2: optional list<TCloudVersionInfo> tableVersionInfos
+   3: optional list<TCloudVersionInfo> partitionVersionInfos
+}
+
 struct TFrontendReportAliveSessionRequest {
    1: required i32 clusterId
    2: required string token
@@ -1899,6 +1912,8 @@ service FrontendService {
 
     TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
 
+    Status.TStatus syncCloudVersion(1: TFrontendSyncCloudVersionRequest 
request)
+
     TInitExternalCtlMetaResult initExternalCtlMeta(1: 
TInitExternalCtlMetaRequest request)
 
     TFetchSchemaTableDataResult fetchSchemaTableData(1: 
TFetchSchemaTableDataRequest request)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to