This is an automated email from the ASF dual-hosted git repository.

meiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e95d2666ccc [feat](snapshot) calculate snapshot data size (#59168)
e95d2666ccc is described below

commit e95d2666ccc608eed2671c97c576a77ec4c160f9
Author: meiyi <[email protected]>
AuthorDate: Mon Dec 22 12:00:14 2025 +0800

    [feat](snapshot) calculate snapshot data size (#59168)
---
 cloud/src/meta-store/keys.cpp                      |  34 ++
 cloud/src/meta-store/keys.h                        |   5 +
 cloud/src/recycler/recycler.h                      |  42 +-
 cloud/src/recycler/recycler_operation_log.cpp      |  38 +-
 .../src/recycler/snapshot_data_size_calculator.cpp | 453 +++++++++++++++++++++
 cloud/test/CMakeLists.txt                          |   1 +
 cloud/test/recycler_operation_log_test.cpp         |  15 +-
 cloud/test/snapshot_data_size_calculator_test.cpp  | 162 ++++++++
 gensrc/proto/cloud.proto                           |  23 +-
 9 files changed, 753 insertions(+), 20 deletions(-)

diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 0ec08782ed2..88073ae3a00 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -1113,6 +1113,40 @@ bool decode_meta_tablet_key(std::string_view* in, 
int64_t* tablet_id, Versionsta
     return true;
 }
 
+// Decode tablet inverted index key
+// Return true if decode successfully, otherwise false
+bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id, 
int64_t* table_id,
+                                      int64_t* index_id, int64_t* partition_id,
+                                      int64_t* tablet_id) {
+    // 0x03 "index" ${instance_id} "tablet_inverted" ${db_id} ${table_id} 
${index_id} ${partition} ${tablet}
+    if (in->empty() || static_cast<uint8_t>((*in)[0]) != 
CLOUD_VERSIONED_KEY_SPACE03) {
+        return false;
+    }
+    in->remove_prefix(1);
+
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    auto res = decode_key(in, &out);
+    if (res != 0 || out.size() != 8) {
+        return false;
+    }
+
+    try {
+        if (std::get<std::string>(std::get<0>(out[0])) != INDEX_KEY_PREFIX ||
+            std::get<std::string>(std::get<0>(out[2])) != 
TABLET_INVERTED_INDEX_KEY_INFIX) {
+            return false;
+        }
+        *db_id = std::get<int64_t>(std::get<0>(out[3]));
+        *table_id = std::get<int64_t>(std::get<0>(out[4]));
+        *index_id = std::get<int64_t>(std::get<0>(out[5]));
+        *partition_id = std::get<int64_t>(std::get<0>(out[6]));
+        *tablet_id = std::get<int64_t>(std::get<0>(out[7]));
+    } catch (const std::bad_variant_access& e) {
+        return false;
+    }
+
+    return true;
+}
+
 bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
                              Versionstamp* timestamp, std::string* 
ref_instance_id) {
     // Key format: 0x03 + encode_bytes("snapshot") + encode_bytes(instance_id) 
+
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index c1b51086f80..53a24939cde 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -597,6 +597,11 @@ bool decode_meta_schema_key(std::string_view* in, int64_t* 
index_id, int64_t* sc
 // Return true if decode successfully, otherwise false
 bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id, 
Versionstamp* timestamp);
 
+// Decode tablet inverted index key
+// Return true if decode successfully, otherwise false
+bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id, 
int64_t* table_id,
+                                      int64_t* index_id, int64_t* 
partition_id, int64_t* tablet_id);
+
 // Decode snapshot reference key
 // Return true if decode successfully, otherwise false
 bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index d5d64d42020..b69e5fa16dd 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -552,6 +552,12 @@ private:
     SegmentRecyclerMetricsContext segment_metrics_context_;
 };
 
+struct OperationLogReferenceInfo {
+    bool referenced_by_instance = false;
+    bool referenced_by_snapshot = false;
+    Versionstamp referenced_snapshot_timestamp;
+};
+
 // Helper class to check if operation logs can be recycled based on snapshots 
and versionstamps
 class OperationLogRecycleChecker {
 public:
@@ -563,10 +569,15 @@ public:
     int init();
 
     // Check if an operation log can be recycled
-    bool can_recycle(const Versionstamp& log_versionstamp, int64_t 
log_min_timestamp) const;
+    bool can_recycle(const Versionstamp& log_versionstamp, int64_t 
log_min_timestamp,
+                     OperationLogReferenceInfo* reference_info) const;
 
     Versionstamp max_versionstamp() const { return max_versionstamp_; }
 
+    const std::vector<std::pair<SnapshotPB, Versionstamp>>& get_snapshots() 
const {
+        return snapshots_;
+    }
+
 private:
     std::string_view instance_id_;
     TxnKv* txn_kv_;
@@ -577,4 +588,33 @@ private:
     std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
 };
 
+class SnapshotDataSizeCalculator {
+public:
+    SnapshotDataSizeCalculator(std::string_view instance_id, 
std::shared_ptr<TxnKv> txn_kv)
+            : instance_id_(instance_id), txn_kv_(std::move(txn_kv)) {}
+
+    void init(const std::vector<std::pair<SnapshotPB, Versionstamp>>& 
snapshots);
+
+    int calculate_operation_log_data_size(const std::string_view& log_key,
+                                          OperationLogPB& operation_log,
+                                          OperationLogReferenceInfo& 
reference_info);
+
+    int save_snapshot_data_size_with_retry();
+
+private:
+    int get_all_index_partitions(int64_t db_id, int64_t table_id, int64_t 
index_id,
+                                 std::vector<int64_t>* partition_ids);
+    int get_index_partition_data_size(int64_t db_id, int64_t table_id, int64_t 
index_id,
+                                      int64_t partition_id, int64_t* 
data_size);
+    int save_operation_log(const std::string_view& log_key, OperationLogPB& 
operation_log);
+    int save_snapshot_data_size();
+
+    std::string_view instance_id_;
+    std::shared_ptr<TxnKv> txn_kv_;
+
+    int64_t instance_retained_data_size_ = 0;
+    std::map<Versionstamp, int64_t> retained_data_size_;
+    std::set<std::string> calculated_partitions_;
+};
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_operation_log.cpp 
b/cloud/src/recycler/recycler_operation_log.cpp
index 5a9bbb305ae..71eda034d5f 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -82,7 +82,8 @@ int OperationLogRecycleChecker::init() {
     snapshots_.clear();
     snapshot_indexes_.clear();
     MetaReader reader(instance_id_);
-    err = reader.get_snapshots(txn.get(), &snapshots_);
+    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+    err = reader.get_snapshots(txn.get(), &snapshots);
     if (err != TxnErrorCode::TXN_OK) {
         LOG_WARNING("failed to get snapshots").tag("err", err);
         return -1;
@@ -96,16 +97,22 @@ int OperationLogRecycleChecker::init() {
     }
 
     max_versionstamp_ = Versionstamp(read_version, 0);
-    for (size_t i = 0; i < snapshots_.size(); ++i) {
-        auto&& [snapshot, versionstamp] = snapshots_[i];
-        snapshot_indexes_.insert(std::make_pair(versionstamp, i));
+    for (size_t i = 0; i < snapshots.size(); ++i) {
+        auto&& [snapshot, versionstamp] = snapshots[i];
+        if (snapshot.status() == SnapshotStatus::SNAPSHOT_ABORTED ||
+            snapshot.status() == SnapshotStatus::SNAPSHOT_RECYCLED) {
+            continue;
+        }
+        snapshot_indexes_.insert(std::make_pair(versionstamp, 
snapshots_.size()));
+        snapshots_.push_back(std::make_pair(std::move(snapshot), 
versionstamp));
     }
 
     return 0;
 }
 
 bool OperationLogRecycleChecker::can_recycle(const Versionstamp& 
log_versionstamp,
-                                             int64_t log_min_timestamp) const {
+                                             int64_t log_min_timestamp,
+                                             OperationLogReferenceInfo* 
reference_info) const {
     Versionstamp log_min_read_timestamp(log_min_timestamp, 0);
     if (log_versionstamp > max_versionstamp_) {
         // Not recycleable.
@@ -114,12 +121,15 @@ bool OperationLogRecycleChecker::can_recycle(const 
Versionstamp& log_versionstam
 
     // Do not recycle operation logs referenced by active snapshots.
     if (log_min_read_timestamp < source_snapshot_versionstamp_) {
+        reference_info->referenced_by_instance = true;
         return false;
     }
 
     auto it = snapshot_indexes_.lower_bound(log_min_read_timestamp);
     if (it != snapshot_indexes_.end() && snapshots_[it->second].second < 
log_versionstamp) {
         // in [log_min_read_timestmap, log_versionstamp)
+        reference_info->referenced_by_snapshot = true;
+        reference_info->referenced_snapshot_timestamp = 
snapshots_[it->second].second;
         return false;
     }
 
@@ -677,6 +687,8 @@ int InstanceRecycler::recycle_operation_logs() {
         LOG_WARNING("failed to initialize recycle checker").tag("error_code", 
init_res);
         return init_res;
     }
+    SnapshotDataSizeCalculator calculator(instance_id_, txn_kv_);
+    calculator.init(recycle_checker.get_snapshots());
 
     auto scan_and_recycle_operation_log = [&](const std::string_view& key,
                                               const std::vector<std::string>& 
raw_keys,
@@ -690,7 +702,9 @@ int InstanceRecycler::recycle_operation_logs() {
         }
 
         size_t value_size = operation_log.ByteSizeLong();
-        if (recycle_checker.can_recycle(log_versionstamp, 
operation_log.min_timestamp())) {
+        OperationLogReferenceInfo reference_info;
+        if (recycle_checker.can_recycle(log_versionstamp, 
operation_log.min_timestamp(),
+                                        &reference_info)) {
             AnnotateTag tag("log_key", hex(key));
             int res = recycle_operation_log(log_versionstamp, raw_keys, 
std::move(operation_log));
             if (res != 0) {
@@ -700,6 +714,13 @@ int InstanceRecycler::recycle_operation_logs() {
 
             recycled_operation_logs++;
             recycled_operation_log_data_size += value_size;
+        } else {
+            int res = calculator.calculate_operation_log_data_size(key, 
operation_log,
+                                                                   
reference_info);
+            if (res != 0) {
+                LOG_WARNING("failed to calculate operation log data 
size").tag("error_code", res);
+                return res;
+            }
         }
 
         total_operation_logs++;
@@ -769,6 +790,11 @@ int InstanceRecycler::recycle_operation_logs() {
                 .tag("error_code", iter->error_code());
         return -1;
     }
+    int res = calculator.save_snapshot_data_size_with_retry();
+    if (res != 0) {
+        LOG_WARNING("failed to save snapshot data size").tag("error_code", 
res);
+        return res;
+    }
     return 0;
 }
 
diff --git a/cloud/src/recycler/snapshot_data_size_calculator.cpp 
b/cloud/src/recycler/snapshot_data_size_calculator.cpp
new file mode 100644
index 00000000000..09222849990
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_size_calculator.cpp
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/builtin_service.pb.h>
+#include <brpc/server.h>
+#include <butil/endpoint.h>
+#include <bvar/status.h>
+#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
+#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
+#include "meta-store/txn_kv.h"
+#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
+#include "meta-store/versionstamp.h"
+#include "recycler/recycler.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+using namespace std::chrono;
+
+// Retry configuration for save data size
+static constexpr int MAX_RETRY_TIMES = 5;
+static constexpr int RETRY_INTERVAL_MS = 100;
+
+void SnapshotDataSizeCalculator::init(
+        const std::vector<std::pair<SnapshotPB, Versionstamp>>& snapshots) {
+    for (auto [_, versionstamp] : snapshots) {
+        retained_data_size_.insert(std::make_pair(versionstamp, 0));
+    }
+}
+
+inline std::string index_partition_key(int64_t index_id, int64_t partition_id) 
{
+    return fmt::format("{}-{}", index_id, partition_id);
+}
+
+int SnapshotDataSizeCalculator::calculate_operation_log_data_size(
+        const std::string_view& log_key, OperationLogPB& operation_log,
+        OperationLogReferenceInfo& reference_info) {
+    if (!reference_info.referenced_by_instance && 
!reference_info.referenced_by_snapshot) {
+        return 0;
+    }
+
+    int64_t data_size = 0;
+    if (operation_log.has_compaction()) {
+        for (auto& recycle_rowset : 
operation_log.compaction().recycle_rowsets()) {
+            data_size += recycle_rowset.rowset_meta().total_disk_size();
+        }
+        VLOG_DEBUG << "compaction log data size: " << data_size << ", key" << 
hex(log_key);
+    } else if (operation_log.has_schema_change()) {
+        for (auto& recycle_rowset : 
operation_log.schema_change().recycle_rowsets()) {
+            data_size += recycle_rowset.rowset_meta().total_disk_size();
+        }
+        VLOG_DEBUG << "schema change log data size: " << data_size << ", key" 
<< hex(log_key);
+    } else if (operation_log.has_drop_partition()) {
+        auto& drop_partition_log = operation_log.drop_partition();
+        int64_t num_total = 0;
+        int64_t num_calculated = 0;
+
+        if (!drop_partition_log.index_partition_to_data_size().empty()) {
+            for (auto& [key, size] : 
drop_partition_log.index_partition_to_data_size()) {
+                ++num_total;
+                data_size += size;
+                calculated_partitions_.insert(key);
+            }
+        } else {
+            int64_t db_id = drop_partition_log.db_id();
+            int64_t table_id = drop_partition_log.table_id();
+            DropPartitionLogPB drop_partition_log_copy = drop_partition_log;
+
+            for (auto partition_id : drop_partition_log.partition_ids()) {
+                for (int64_t index_id : drop_partition_log.index_ids()) {
+                    std::string key = index_partition_key(index_id, 
partition_id);
+                    if (calculated_partitions_.contains(key)) {
+                        continue;
+                    }
+                    ++num_total;
+                    int64_t partition_data_size = 0;
+                    if (get_index_partition_data_size(db_id, table_id, 
index_id, partition_id,
+                                                      &partition_data_size) != 
0) {
+                        return -1;
+                    }
+                    ++num_calculated;
+                    data_size += partition_data_size;
+                    calculated_partitions_.insert(key);
+                    
drop_partition_log_copy.mutable_index_partition_to_data_size()->emplace(
+                            key, partition_data_size);
+                }
+            }
+
+            if 
(!drop_partition_log_copy.index_partition_to_data_size().empty()) {
+                operation_log.clear_drop_partition();
+                
operation_log.mutable_drop_partition()->Swap(&drop_partition_log_copy);
+                if (save_operation_log(log_key, operation_log) != 0) {
+                    return -1;
+                }
+            }
+        }
+
+        VLOG_DEBUG << "drop partition log data size: " << data_size << ", key" 
<< hex(log_key)
+                   << ", partition_num_total=" << num_total
+                   << ", partition_num_calculated=" << num_calculated;
+    } else if (operation_log.has_drop_index()) {
+        auto& drop_index_log = operation_log.drop_index();
+        int64_t num_total = 0;
+        int64_t num_calculated = 0;
+
+        if (!drop_index_log.index_partition_to_data_size().empty()) {
+            for (auto& [key, size] : 
drop_index_log.index_partition_to_data_size()) {
+                ++num_total;
+                data_size += size;
+                calculated_partitions_.insert(key);
+            }
+        } else {
+            int64_t db_id = drop_index_log.db_id();
+            int64_t table_id = drop_index_log.table_id();
+            DropIndexLogPB drop_index_log_copy = drop_index_log;
+
+            for (auto index_id : drop_index_log.index_ids()) {
+                std::vector<int64_t> partition_ids;
+                if (get_all_index_partitions(db_id, table_id, index_id, 
&partition_ids) != 0) {
+                    return -1;
+                }
+                for (int64_t partition_id : partition_ids) {
+                    ++num_total;
+                    std::string key = index_partition_key(index_id, 
partition_id);
+                    if (calculated_partitions_.contains(key)) {
+                        continue;
+                    }
+                    int64_t partition_data_size = 0;
+                    if (get_index_partition_data_size(db_id, table_id, 
index_id, partition_id,
+                                                      &partition_data_size) != 
0) {
+                        return -1;
+                    }
+                    ++num_calculated;
+                    data_size += partition_data_size;
+                    calculated_partitions_.insert(key);
+                    
drop_index_log_copy.mutable_index_partition_to_data_size()->emplace(
+                            key, partition_data_size);
+                }
+            }
+
+            if (!drop_index_log_copy.index_partition_to_data_size().empty()) {
+                operation_log.clear_drop_index();
+                operation_log.mutable_drop_index()->Swap(&drop_index_log_copy);
+                if (save_operation_log(log_key, operation_log) != 0) {
+                    return -1;
+                }
+            }
+        }
+
+        VLOG_DEBUG << "drop index log data size: " << data_size << ", key" << 
hex(log_key)
+                   << ", partition_num_total=" << num_total
+                   << ", partition_num_calculated=" << num_calculated;
+    }
+    if (reference_info.referenced_by_snapshot) {
+        retained_data_size_[reference_info.referenced_snapshot_timestamp] += 
data_size;
+    } else {
+        instance_retained_data_size_ += data_size;
+    }
+    return 0;
+}
+
+int SnapshotDataSizeCalculator::save_snapshot_data_size_with_retry() {
+    for (int retry = 0; retry < MAX_RETRY_TIMES; retry++) {
+        if (retry > 0) {
+            
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_INTERVAL_MS));
+        }
+
+        int res = save_snapshot_data_size();
+        if (res == -2) {
+            continue; // TXN_CONFLICT
+        }
+        return res;
+    }
+    return -1;
+}
+
+int SnapshotDataSizeCalculator::save_snapshot_data_size() {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create transaction for saving snapshot data 
size")
+                .tag("error_code", err);
+        return -1;
+    }
+    bool need_commit = false;
+    int64_t left_data_size = instance_retained_data_size_;
+
+    // save data size to SnapshotPB
+    for (auto& [snapshot_versionstamp, retained_data_size] : 
retained_data_size_) {
+        std::string snapshot_full_key = 
versioned::snapshot_full_key({instance_id_});
+        std::string snapshot_key = encode_versioned_key(snapshot_full_key, 
snapshot_versionstamp);
+        std::string snapshot_val;
+        err = txn->get(snapshot_key, &snapshot_val);
+
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            // impossible
+            LOG_WARNING("snapshot key not found").tag("key", 
hex(snapshot_key));
+            continue;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to get snapshot key").tag("key", 
hex(snapshot_key)).tag("err", err);
+            return -1;
+        }
+        SnapshotPB snapshot_pb;
+        if (!snapshot_pb.ParseFromString(snapshot_val)) {
+            LOG_WARNING("failed to parse SnapshotPB")
+                    .tag("instance_id", instance_id_)
+                    .tag("snapshot_id", snapshot_versionstamp.to_string());
+            return -1;
+        }
+        int64_t billable_data_size = snapshot_pb.snapshot_meta_image_size();
+        if (retained_data_size >= snapshot_pb.snapshot_logical_data_size()) {
+            billable_data_size += snapshot_pb.snapshot_logical_data_size();
+            left_data_size += retained_data_size - 
snapshot_pb.snapshot_logical_data_size();
+        } else if (retained_data_size + left_data_size <=
+                   snapshot_pb.snapshot_logical_data_size()) {
+            billable_data_size += retained_data_size + left_data_size;
+            left_data_size = 0;
+        } else {
+            billable_data_size += snapshot_pb.snapshot_logical_data_size();
+            left_data_size =
+                    retained_data_size + left_data_size - 
snapshot_pb.snapshot_logical_data_size();
+        }
+
+        if (!snapshot_pb.has_snapshot_retained_data_size() ||
+            snapshot_pb.snapshot_retained_data_size() != retained_data_size ||
+            !snapshot_pb.has_snapshot_billable_data_size() ||
+            snapshot_pb.snapshot_billable_data_size() != billable_data_size) {
+            LOG_INFO("update snapshot data size")
+                    .tag("instance_id", instance_id_)
+                    .tag("snapshot_id", snapshot_versionstamp.to_string())
+                    .tag("old_retained_data_size", 
snapshot_pb.snapshot_retained_data_size())
+                    .tag("new_retained_data_size", retained_data_size)
+                    .tag("old_billable_data_size", 
snapshot_pb.snapshot_billable_data_size())
+                    .tag("new_billable_data_size", billable_data_size);
+            snapshot_pb.set_snapshot_retained_data_size(retained_data_size);
+            snapshot_pb.set_snapshot_billable_data_size(billable_data_size);
+
+            std::string updated_snapshot_val;
+            if (!snapshot_pb.SerializeToString(&updated_snapshot_val)) {
+                LOG_WARNING("failed to serialize updated SnapshotPB");
+                return -1;
+            }
+            txn->put(snapshot_key, updated_snapshot_val);
+            need_commit = true;
+        }
+    }
+
+    // save instance data size
+    std::string key = instance_key(instance_id_);
+    std::string instance_value;
+    err = txn->get(key, &instance_value);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to get instance info")
+                .tag("instance_id", instance_id_)
+                .tag("error", err);
+        return -1;
+    }
+    InstanceInfoPB instance_info;
+    if (!instance_info.ParseFromString(instance_value)) {
+        LOG_WARNING("failed to parse instance info").tag("instance_id", 
instance_id_);
+        return -1;
+    }
+    if (!instance_info.has_snapshot_retained_data_size() ||
+        instance_info.snapshot_retained_data_size() != 
instance_retained_data_size_ ||
+        !instance_info.has_snapshot_billable_data_size() ||
+        instance_info.snapshot_billable_data_size() != left_data_size) {
+        
instance_info.set_snapshot_retained_data_size(instance_retained_data_size_);
+        instance_info.set_snapshot_billable_data_size(left_data_size);
+        txn->put(key, instance_info.SerializeAsString());
+        need_commit = true;
+        LOG_INFO("update instance snapshot data size")
+                .tag("instance_id", instance_id_)
+                .tag("old_retained_data_size", 
instance_info.snapshot_retained_data_size())
+                .tag("new_retained_data_size", instance_retained_data_size_)
+                .tag("old_billable_data_size", 
instance_info.snapshot_billable_data_size())
+                .tag("new_billable_data_size", left_data_size);
+    }
+
+    if (need_commit) {
+        err = txn->commit();
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to commit kv txn for saving snapshot data 
size").tag("err", err);
+            if (err == TxnErrorCode::TXN_CONFLICT) {
+                return -2;
+            }
+            return -1;
+        }
+    }
+    return 0;
+}
+
+int SnapshotDataSizeCalculator::get_all_index_partitions(int64_t db_id, 
int64_t table_id,
+                                                         int64_t index_id,
+                                                         std::vector<int64_t>* 
partition_ids) {
+    partition_ids->clear();
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create transaction for getting all index 
partitions")
+                .tag("error_code", err);
+        return -1;
+    }
+
+    std::string tablet_key_begin =
+            versioned::tablet_inverted_index_key({instance_id_, db_id, 
table_id, index_id, 0, 0});
+    std::string tablet_key_end = versioned::tablet_inverted_index_key(
+            {instance_id_, db_id, table_id, index_id + 1, 0, 0});
+
+    int64_t last_partition_id = 0;
+    FullRangeGetOptions opts;
+    opts.snapshot = true;
+    opts.prefetch = true;
+    opts.txn_kv = txn_kv_;
+    auto it = txn_kv_->full_range_get(tablet_key_begin, tablet_key_end, opts);
+    for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+        auto&& [k, _] = *kvp;
+
+        int64_t decode_db_id = -1, decode_table_id = -1, decode_index_id = -1, 
partition_id = -1,
+                tablet_id = -1;
+        std::string_view key_view(k);
+        if (!versioned::decode_tablet_inverted_index_key(&key_view, 
&decode_db_id, &decode_table_id,
+                                                         &decode_index_id, 
&partition_id,
+                                                         &tablet_id)) {
+            LOG_WARNING("failed to 
decode_tablet_inverted_index_key").tag("key", hex(k));
+            return -1;
+        }
+        if (partition_id != last_partition_id) {
+            last_partition_id = partition_id;
+            partition_ids->emplace_back(partition_id);
+        }
+    }
+    if (!it->is_valid()) {
+        LOG_ERROR("failed to get all index partitions").tag("error_code", 
it->error_code());
+        return -1;
+    }
+    return 0;
+}
+
+int SnapshotDataSizeCalculator::get_index_partition_data_size(int64_t db_id, 
int64_t table_id,
+                                                              int64_t index_id,
+                                                              int64_t 
partition_id,
+                                                              int64_t* 
data_size) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create transaction for getting index partition 
data size")
+                .tag("error_code", err);
+        return -1;
+    }
+
+    // for a clone instance, the key ranges may be empty if snapshot chain is 
not compacted
+    std::string tablet_key_begin = versioned::tablet_inverted_index_key(
+            {instance_id_, db_id, table_id, index_id, partition_id, 0});
+    std::string tablet_key_end = versioned::tablet_inverted_index_key(
+            {instance_id_, db_id, table_id, index_id, partition_id + 1, 0});
+    int64_t tablet_num = 0;
+    MetaReader reader(instance_id_);
+    FullRangeGetOptions opts;
+    opts.snapshot = true;
+    opts.prefetch = true;
+    opts.txn_kv = txn_kv_;
+    auto it = txn_kv_->full_range_get(tablet_key_begin, tablet_key_end, opts);
+    for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+        auto&& [k, _] = *kvp;
+
+        int64_t decode_db_id = -1, decode_table_id = -1, decode_index_id = -1,
+                decode_partition_id = -1, tablet_id = -1;
+        std::string_view key_view(k);
+        if (!versioned::decode_tablet_inverted_index_key(&key_view, 
&decode_db_id, &decode_table_id,
+                                                         &decode_index_id, 
&decode_partition_id,
+                                                         &tablet_id)) {
+            LOG_WARNING("failed to 
decode_tablet_inverted_index_key=").tag("key", hex(k));
+            return -1;
+        }
+
+        TabletStatsPB tablet_stats;
+        err = reader.get_tablet_merged_stats(txn.get(), tablet_id, 
&tablet_stats, nullptr);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to get tablet merged stats")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("error_code", err);
+            return -1;
+        }
+
+        *data_size += tablet_stats.data_size();
+        ++tablet_num;
+    }
+    if (!it->is_valid()) {
+        LOG_ERROR("failed to get index partition data size").tag("error_code", 
it->error_code());
+        return -1;
+    }
+    VLOG_DEBUG << "get data size for instance_id=" << instance_id_ << " 
db_id=" << db_id
+               << " table_id=" << table_id << " index_id=" << index_id
+               << " partition_id=" << partition_id << " tablet_num=" << 
tablet_num
+               << " data_size=" << *data_size;
+    return 0;
+}
+
+int SnapshotDataSizeCalculator::save_operation_log(const std::string_view& 
log_key,
+                                                   OperationLogPB& 
operation_log) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to create transaction for saving operation 
log").tag("error_code", err);
+        return -1;
+    }
+    blob_put(txn.get(), log_key, operation_log, 0);
+    LOG_INFO("put operation log key")
+            .tag("instance_id", instance_id_)
+            .tag("operation_log_key", hex(log_key))
+            .tag("value", operation_log.DebugString());
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to commit transaction for saving operation 
log").tag("error_code", err);
+        return -1;
+    }
+    return 0;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 75e7b1b4fea..0762231c3ab 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -29,6 +29,7 @@ set_target_properties(txn_kv_test PROPERTIES COMPILE_FLAGS 
"-fno-access-control"
 add_executable(recycler_test
     recycler_test.cpp
     recycler_operation_log_test.cpp
+    snapshot_data_size_calculator_test.cpp
     recycle_versioned_keys_test.cpp)
 
 add_executable(mem_txn_kv_test mem_txn_kv_test.cpp)
diff --git a/cloud/test/recycler_operation_log_test.cpp 
b/cloud/test/recycler_operation_log_test.cpp
index 4718e58975a..8eb1cb6f4b2 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -2547,13 +2547,14 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) 
{
     insert_empty_value();
 
     // Test initialization without snapshots
+    OperationLogReferenceInfo reference_info;
     {
         InstanceInfoPB instance_info;
         OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(), 
instance_info);
         ASSERT_EQ(checker.init(), 0);
 
         // All logs should be recyclable when no snapshots exist
-        ASSERT_TRUE(checker.can_recycle(old_version, 1)) << 
old_version.version();
+        ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info)) << 
old_version.version();
     }
 
     {
@@ -2563,7 +2564,7 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) {
         ASSERT_EQ(checker.init(), 0);
 
         OperationLogPB op_log;
-        ASSERT_TRUE(checker.can_recycle(old_version, op_log.min_timestamp()));
+        ASSERT_TRUE(checker.can_recycle(old_version, op_log.min_timestamp(), 
&reference_info));
     }
 
     auto write_snapshot = [&]() {
@@ -2591,19 +2592,19 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) 
{
         ASSERT_EQ(checker.init(), 0);
 
         // case 1, old operation log can be recycled.
-        ASSERT_TRUE(checker.can_recycle(old_version, 1));
+        ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info));
         // case 2. snapshot exist in the log range, can not be recycled.
-        ASSERT_FALSE(checker.can_recycle(version1, old_version.version()))
+        ASSERT_FALSE(checker.can_recycle(version1, old_version.version(), 
&reference_info))
                 << "version1: " << version1.version() << ", old_version: " << 
old_version.version();
 
         Versionstamp version3 = get_current_versionstamp();
         Versionstamp version4(version3.version(), 1);
 
         // case 3. large operation log can not be recycled.
-        ASSERT_FALSE(checker.can_recycle(version4, version2.version()));
+        ASSERT_FALSE(checker.can_recycle(version4, version2.version(), 
&reference_info));
 
         // case 4: [min_version, operation log version)
-        ASSERT_TRUE(checker.can_recycle(version1, version1.version()));
+        ASSERT_TRUE(checker.can_recycle(version1, version1.version(), 
&reference_info));
     }
 
     {
@@ -2616,7 +2617,7 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) {
 
         Versionstamp version5 = get_current_versionstamp();
 
-        ASSERT_FALSE(checker.can_recycle(version5, version2.version()))
+        ASSERT_FALSE(checker.can_recycle(version5, version2.version(), 
&reference_info))
                 << "version5: " << version5.version() << ", version2: " << 
version2.version();
     }
 }
diff --git a/cloud/test/snapshot_data_size_calculator_test.cpp 
b/cloud/test/snapshot_data_size_calculator_test.cpp
new file mode 100644
index 00000000000..dfe6d760c46
--- /dev/null
+++ b/cloud/test/snapshot_data_size_calculator_test.cpp
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/keys.h"
+#include "meta-store/mem_txn_kv.h"
+#include "meta-store/txn_kv.h"
+#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
+#include "recycler/recycler.h"
+
+using namespace doris::cloud;
+
+extern std::string instance_id;
+
+static void update_instance_info(TxnKv* txn_kv, const InstanceInfoPB& 
instance) {
+    std::string key = instance_key({instance_id});
+    std::string value = instance.SerializeAsString();
+    ASSERT_FALSE(value.empty()) << "Failed to serialize instance info";
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+    txn->put(key, value);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit 
transaction";
+}
+
+void get_instance(TxnKv* txn_kv, InstanceInfoPB& instance) {
+    std::string key = instance_key({instance_id});
+    std::string val;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+    ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+    ASSERT_TRUE(instance.ParseFromString(val));
+}
+
+void update_snapshot(TxnKv* txn_kv, const Versionstamp& versionstamp, const 
SnapshotPB& snapshot) {
+    std::string snapshot_full_key = 
versioned::snapshot_full_key({instance_id});
+    std::string key = encode_versioned_key(snapshot_full_key, versionstamp);
+    std::string value = snapshot.SerializeAsString();
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+    txn->put(key, value);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit 
transaction";
+}
+
+void get_snapshot(TxnKv* txn_kv, const Versionstamp& versionstamp, SnapshotPB& 
snapshot) {
+    std::string snapshot_full_key = 
versioned::snapshot_full_key({instance_id});
+    std::string key = encode_versioned_key(snapshot_full_key, versionstamp);
+    std::string val;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+    ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+    ASSERT_TRUE(snapshot.ParseFromString(val));
+}
+
+TEST(SnapshotDataSizeCalculatorTest, SaveSnapshotDataSizeTest) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("0");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_empty");
+    update_instance_info(txn_kv.get(), instance);
+
+    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+
+    auto create_snapshot = [&](Versionstamp versionstamp, int64_t image_size,
+                               int logical_data_size) {
+        SnapshotPB snapshot;
+        snapshot.set_snapshot_meta_image_size(image_size);
+        snapshot.set_snapshot_logical_data_size(logical_data_size);
+        snapshots.emplace_back(std::make_pair(snapshot, versionstamp));
+        update_snapshot(txn_kv.get(), versionstamp, snapshot);
+    };
+
+    Versionstamp vs1(1);
+    create_snapshot(vs1, 100, 1000);
+    Versionstamp vs2(2);
+    create_snapshot(vs2, 200, 2000);
+    Versionstamp vs3(3);
+    create_snapshot(vs3, 300, 3000);
+    Versionstamp vs4(4);
+    create_snapshot(vs4, 400, 4000);
+    Versionstamp vs5(5);
+    create_snapshot(vs5, 500, 1000);
+    Versionstamp vs6(6);
+    create_snapshot(vs6, 600, 8000);
+    Versionstamp vs7(7);
+    create_snapshot(vs7, 700, 8000);
+    Versionstamp vs8(8);
+    create_snapshot(vs8, 800, 9000);
+
+    SnapshotDataSizeCalculator calculator(instance_id, txn_kv);
+    calculator.init(snapshots);
+    std::map<Versionstamp, int64_t> retained_data_size;
+    retained_data_size[vs1] = 500;
+    retained_data_size[vs2] = 2000;
+    retained_data_size[vs3] = 4000;
+    retained_data_size[vs4] = 2000;
+    retained_data_size[vs5] = 5000;
+    retained_data_size[vs6] = 7000;
+    retained_data_size[vs7] = 2000;
+    retained_data_size[vs8] = 10000;
+    calculator.instance_retained_data_size_ = 10;
+    calculator.retained_data_size_ = std::move(retained_data_size);
+
+    ASSERT_EQ(calculator.save_snapshot_data_size_with_retry(), 0);
+
+    auto check_snapshot = [&](Versionstamp versionstamp, int64_t 
expected_retained_data_size,
+                              int expected_billable_data_size) {
+        SnapshotPB snapshot;
+        get_snapshot(txn_kv.get(), versionstamp, snapshot);
+        ASSERT_EQ(snapshot.snapshot_retained_data_size(), 
expected_retained_data_size);
+        ASSERT_EQ(snapshot.snapshot_billable_data_size(), 
expected_billable_data_size);
+    };
+
+    check_snapshot(vs1, 500, 610);
+    check_snapshot(vs2, 2000, 2200);
+    check_snapshot(vs3, 4000, 3300);
+    check_snapshot(vs4, 2000, 3400);
+    check_snapshot(vs5, 5000, 1500);
+    check_snapshot(vs6, 7000, 8600);
+    check_snapshot(vs7, 2000, 2000 + 700 + 3000);
+    check_snapshot(vs8, 10000, 9800);
+    get_instance(txn_kv.get(), instance);
+    ASSERT_EQ(instance.snapshot_retained_data_size(), 10);
+    ASSERT_EQ(instance.snapshot_billable_data_size(), 1000);
+}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index b93df8f52ec..ea5441cdc4a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -137,6 +137,9 @@ message InstanceInfoPB {
 
     repeated KeySetType migrated_key_sets = 119;    // convert single version 
keys to multi version keys
     repeated KeySetType compacted_key_sets = 120;   // compact snapshot chain 
...
+
+    optional int64 snapshot_retained_data_size = 121;
+    optional int64 snapshot_billable_data_size = 122;
 }
 
 message StagePB {
@@ -758,6 +761,8 @@ message DropPartitionLogPB {
     repeated int64 partition_ids = 4;
     optional int64 expired_at_s = 5; // expiration timestamp
     optional bool update_table_version = 6;
+
+    map<string, int64> index_partition_to_data_size = 7;
 }
 
 message CommitIndexLogPB {
@@ -772,6 +777,8 @@ message DropIndexLogPB {
     optional int64 table_id = 2;
     repeated int64 index_ids = 3;
     optional int64 expiration = 4; // expiration timestamp
+
+    map<string, int64> index_partition_to_data_size = 5;
 }
 
 message UpdateTabletLogPB {
@@ -849,8 +856,10 @@ message SnapshotPB {
     optional string upload_file = 15;
     optional string upload_id = 16;
 
-    optional int64 image_file_size = 17;
-    optional int64 snapshot_data_size = 18;
+    optional int64 snapshot_meta_image_size = 17;
+    optional int64 snapshot_logical_data_size = 18;
+    optional int64 snapshot_retained_data_size = 19;
+    optional int64 snapshot_billable_data_size = 20;
 
     // TODO: add the referenced resources
 }
@@ -2043,8 +2052,8 @@ message CommitSnapshotRequest {
     optional string image_url = 3;
     optional int64 last_journal_id = 4;
     optional string request_ip = 5;
-    optional int64 image_file_size = 6;
-    optional int64 snapshot_data_size = 7;
+    optional int64 snapshot_meta_image_size = 6;
+    optional int64 snapshot_logical_data_size = 7;
 }
 
 message CommitSnapshotResponse {
@@ -2079,8 +2088,10 @@ message SnapshotInfoPB {
     optional string reason = 14;
 
     repeated string derived_instance_ids = 15;
-    optional int64 image_file_size = 16;
-    optional int64 snapshot_data_size = 17;
+    optional int64 snapshot_meta_image_size = 16;
+    optional int64 snapshot_logical_data_size = 17;
+    optional int64 snapshot_retained_data_size = 18;
+    optional int64 snapshot_billable_data_size = 19;
 }
 
 message ListSnapshotRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to