This is an automated email from the ASF dual-hosted git repository.
meiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e95d2666ccc [feat](snapshot) calculate snapshot data size (#59168)
e95d2666ccc is described below
commit e95d2666ccc608eed2671c97c576a77ec4c160f9
Author: meiyi <[email protected]>
AuthorDate: Mon Dec 22 12:00:14 2025 +0800
[feat](snapshot) calculate snapshot data size (#59168)
---
cloud/src/meta-store/keys.cpp | 34 ++
cloud/src/meta-store/keys.h | 5 +
cloud/src/recycler/recycler.h | 42 +-
cloud/src/recycler/recycler_operation_log.cpp | 38 +-
.../src/recycler/snapshot_data_size_calculator.cpp | 453 +++++++++++++++++++++
cloud/test/CMakeLists.txt | 1 +
cloud/test/recycler_operation_log_test.cpp | 15 +-
cloud/test/snapshot_data_size_calculator_test.cpp | 162 ++++++++
gensrc/proto/cloud.proto | 23 +-
9 files changed, 753 insertions(+), 20 deletions(-)
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 0ec08782ed2..88073ae3a00 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -1113,6 +1113,40 @@ bool decode_meta_tablet_key(std::string_view* in,
int64_t* tablet_id, Versionsta
return true;
}
+// Decode tablet inverted index key
+// Return true if decode successfully, otherwise false
+bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id,
int64_t* table_id,
+ int64_t* index_id, int64_t* partition_id,
+ int64_t* tablet_id) {
+ // 0x03 "index" ${instance_id} "tablet_inverted" ${db_id} ${table_id}
${index_id} ${partition} ${tablet}
+ if (in->empty() || static_cast<uint8_t>((*in)[0]) !=
CLOUD_VERSIONED_KEY_SPACE03) {
+ return false;
+ }
+ in->remove_prefix(1);
+
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ auto res = decode_key(in, &out);
+ if (res != 0 || out.size() != 8) {
+ return false;
+ }
+
+ try {
+ if (std::get<std::string>(std::get<0>(out[0])) != INDEX_KEY_PREFIX ||
+ std::get<std::string>(std::get<0>(out[2])) !=
TABLET_INVERTED_INDEX_KEY_INFIX) {
+ return false;
+ }
+ *db_id = std::get<int64_t>(std::get<0>(out[3]));
+ *table_id = std::get<int64_t>(std::get<0>(out[4]));
+ *index_id = std::get<int64_t>(std::get<0>(out[5]));
+ *partition_id = std::get<int64_t>(std::get<0>(out[6]));
+ *tablet_id = std::get<int64_t>(std::get<0>(out[7]));
+ } catch (const std::bad_variant_access& e) {
+ return false;
+ }
+
+ return true;
+}
+
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
Versionstamp* timestamp, std::string*
ref_instance_id) {
// Key format: 0x03 + encode_bytes("snapshot") + encode_bytes(instance_id)
+
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index c1b51086f80..53a24939cde 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -597,6 +597,11 @@ bool decode_meta_schema_key(std::string_view* in, int64_t*
index_id, int64_t* sc
// Return true if decode successfully, otherwise false
bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id,
Versionstamp* timestamp);
+// Decode tablet inverted index key
+// Return true if decode successfully, otherwise false
+bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id,
int64_t* table_id,
+ int64_t* index_id, int64_t*
partition_id, int64_t* tablet_id);
+
// Decode snapshot reference key
// Return true if decode successfully, otherwise false
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index d5d64d42020..b69e5fa16dd 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -552,6 +552,12 @@ private:
SegmentRecyclerMetricsContext segment_metrics_context_;
};
+struct OperationLogReferenceInfo {
+ bool referenced_by_instance = false;
+ bool referenced_by_snapshot = false;
+ Versionstamp referenced_snapshot_timestamp;
+};
+
// Helper class to check if operation logs can be recycled based on snapshots
and versionstamps
class OperationLogRecycleChecker {
public:
@@ -563,10 +569,15 @@ public:
int init();
// Check if an operation log can be recycled
- bool can_recycle(const Versionstamp& log_versionstamp, int64_t
log_min_timestamp) const;
+ bool can_recycle(const Versionstamp& log_versionstamp, int64_t
log_min_timestamp,
+ OperationLogReferenceInfo* reference_info) const;
Versionstamp max_versionstamp() const { return max_versionstamp_; }
+ const std::vector<std::pair<SnapshotPB, Versionstamp>>& get_snapshots()
const {
+ return snapshots_;
+ }
+
private:
std::string_view instance_id_;
TxnKv* txn_kv_;
@@ -577,4 +588,33 @@ private:
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
};
+class SnapshotDataSizeCalculator {
+public:
+ SnapshotDataSizeCalculator(std::string_view instance_id,
std::shared_ptr<TxnKv> txn_kv)
+ : instance_id_(instance_id), txn_kv_(std::move(txn_kv)) {}
+
+ void init(const std::vector<std::pair<SnapshotPB, Versionstamp>>&
snapshots);
+
+ int calculate_operation_log_data_size(const std::string_view& log_key,
+ OperationLogPB& operation_log,
+ OperationLogReferenceInfo&
reference_info);
+
+ int save_snapshot_data_size_with_retry();
+
+private:
+ int get_all_index_partitions(int64_t db_id, int64_t table_id, int64_t
index_id,
+ std::vector<int64_t>* partition_ids);
+ int get_index_partition_data_size(int64_t db_id, int64_t table_id, int64_t
index_id,
+ int64_t partition_id, int64_t*
data_size);
+ int save_operation_log(const std::string_view& log_key, OperationLogPB&
operation_log);
+ int save_snapshot_data_size();
+
+ std::string_view instance_id_;
+ std::shared_ptr<TxnKv> txn_kv_;
+
+ int64_t instance_retained_data_size_ = 0;
+ std::map<Versionstamp, int64_t> retained_data_size_;
+ std::set<std::string> calculated_partitions_;
+};
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_operation_log.cpp
b/cloud/src/recycler/recycler_operation_log.cpp
index 5a9bbb305ae..71eda034d5f 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -82,7 +82,8 @@ int OperationLogRecycleChecker::init() {
snapshots_.clear();
snapshot_indexes_.clear();
MetaReader reader(instance_id_);
- err = reader.get_snapshots(txn.get(), &snapshots_);
+ std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+ err = reader.get_snapshots(txn.get(), &snapshots);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to get snapshots").tag("err", err);
return -1;
@@ -96,16 +97,22 @@ int OperationLogRecycleChecker::init() {
}
max_versionstamp_ = Versionstamp(read_version, 0);
- for (size_t i = 0; i < snapshots_.size(); ++i) {
- auto&& [snapshot, versionstamp] = snapshots_[i];
- snapshot_indexes_.insert(std::make_pair(versionstamp, i));
+ for (size_t i = 0; i < snapshots.size(); ++i) {
+ auto&& [snapshot, versionstamp] = snapshots[i];
+ if (snapshot.status() == SnapshotStatus::SNAPSHOT_ABORTED ||
+ snapshot.status() == SnapshotStatus::SNAPSHOT_RECYCLED) {
+ continue;
+ }
+ snapshot_indexes_.insert(std::make_pair(versionstamp,
snapshots_.size()));
+ snapshots_.push_back(std::make_pair(std::move(snapshot),
versionstamp));
}
return 0;
}
bool OperationLogRecycleChecker::can_recycle(const Versionstamp&
log_versionstamp,
- int64_t log_min_timestamp) const {
+ int64_t log_min_timestamp,
+ OperationLogReferenceInfo*
reference_info) const {
Versionstamp log_min_read_timestamp(log_min_timestamp, 0);
if (log_versionstamp > max_versionstamp_) {
// Not recycleable.
@@ -114,12 +121,15 @@ bool OperationLogRecycleChecker::can_recycle(const
Versionstamp& log_versionstam
// Do not recycle operation logs referenced by active snapshots.
if (log_min_read_timestamp < source_snapshot_versionstamp_) {
+ reference_info->referenced_by_instance = true;
return false;
}
auto it = snapshot_indexes_.lower_bound(log_min_read_timestamp);
if (it != snapshot_indexes_.end() && snapshots_[it->second].second <
log_versionstamp) {
// in [log_min_read_timestmap, log_versionstamp)
+ reference_info->referenced_by_snapshot = true;
+ reference_info->referenced_snapshot_timestamp =
snapshots_[it->second].second;
return false;
}
@@ -677,6 +687,8 @@ int InstanceRecycler::recycle_operation_logs() {
LOG_WARNING("failed to initialize recycle checker").tag("error_code",
init_res);
return init_res;
}
+ SnapshotDataSizeCalculator calculator(instance_id_, txn_kv_);
+ calculator.init(recycle_checker.get_snapshots());
auto scan_and_recycle_operation_log = [&](const std::string_view& key,
const std::vector<std::string>&
raw_keys,
@@ -690,7 +702,9 @@ int InstanceRecycler::recycle_operation_logs() {
}
size_t value_size = operation_log.ByteSizeLong();
- if (recycle_checker.can_recycle(log_versionstamp,
operation_log.min_timestamp())) {
+ OperationLogReferenceInfo reference_info;
+ if (recycle_checker.can_recycle(log_versionstamp,
operation_log.min_timestamp(),
+ &reference_info)) {
AnnotateTag tag("log_key", hex(key));
int res = recycle_operation_log(log_versionstamp, raw_keys,
std::move(operation_log));
if (res != 0) {
@@ -700,6 +714,13 @@ int InstanceRecycler::recycle_operation_logs() {
recycled_operation_logs++;
recycled_operation_log_data_size += value_size;
+ } else {
+ int res = calculator.calculate_operation_log_data_size(key,
operation_log,
+
reference_info);
+ if (res != 0) {
+ LOG_WARNING("failed to calculate operation log data
size").tag("error_code", res);
+ return res;
+ }
}
total_operation_logs++;
@@ -769,6 +790,11 @@ int InstanceRecycler::recycle_operation_logs() {
.tag("error_code", iter->error_code());
return -1;
}
+ int res = calculator.save_snapshot_data_size_with_retry();
+ if (res != 0) {
+ LOG_WARNING("failed to save snapshot data size").tag("error_code",
res);
+ return res;
+ }
return 0;
}
diff --git a/cloud/src/recycler/snapshot_data_size_calculator.cpp
b/cloud/src/recycler/snapshot_data_size_calculator.cpp
new file mode 100644
index 00000000000..09222849990
--- /dev/null
+++ b/cloud/src/recycler/snapshot_data_size_calculator.cpp
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/builtin_service.pb.h>
+#include <brpc/server.h>
+#include <butil/endpoint.h>
+#include <bvar/status.h>
+#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/util.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/blob_message.h"
+#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
+#include "meta-store/txn_kv.h"
+#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
+#include "meta-store/versionstamp.h"
+#include "recycler/recycler.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+using namespace std::chrono;
+
+// Retry configuration for save data size
+static constexpr int MAX_RETRY_TIMES = 5;
+static constexpr int RETRY_INTERVAL_MS = 100;
+
+void SnapshotDataSizeCalculator::init(
+ const std::vector<std::pair<SnapshotPB, Versionstamp>>& snapshots) {
+ for (auto [_, versionstamp] : snapshots) {
+ retained_data_size_.insert(std::make_pair(versionstamp, 0));
+ }
+}
+
+inline std::string index_partition_key(int64_t index_id, int64_t partition_id)
{
+ return fmt::format("{}-{}", index_id, partition_id);
+}
+
+int SnapshotDataSizeCalculator::calculate_operation_log_data_size(
+ const std::string_view& log_key, OperationLogPB& operation_log,
+ OperationLogReferenceInfo& reference_info) {
+ if (!reference_info.referenced_by_instance &&
!reference_info.referenced_by_snapshot) {
+ return 0;
+ }
+
+ int64_t data_size = 0;
+ if (operation_log.has_compaction()) {
+ for (auto& recycle_rowset :
operation_log.compaction().recycle_rowsets()) {
+ data_size += recycle_rowset.rowset_meta().total_disk_size();
+ }
+ VLOG_DEBUG << "compaction log data size: " << data_size << ", key" <<
hex(log_key);
+ } else if (operation_log.has_schema_change()) {
+ for (auto& recycle_rowset :
operation_log.schema_change().recycle_rowsets()) {
+ data_size += recycle_rowset.rowset_meta().total_disk_size();
+ }
+ VLOG_DEBUG << "schema change log data size: " << data_size << ", key"
<< hex(log_key);
+ } else if (operation_log.has_drop_partition()) {
+ auto& drop_partition_log = operation_log.drop_partition();
+ int64_t num_total = 0;
+ int64_t num_calculated = 0;
+
+ if (!drop_partition_log.index_partition_to_data_size().empty()) {
+ for (auto& [key, size] :
drop_partition_log.index_partition_to_data_size()) {
+ ++num_total;
+ data_size += size;
+ calculated_partitions_.insert(key);
+ }
+ } else {
+ int64_t db_id = drop_partition_log.db_id();
+ int64_t table_id = drop_partition_log.table_id();
+ DropPartitionLogPB drop_partition_log_copy = drop_partition_log;
+
+ for (auto partition_id : drop_partition_log.partition_ids()) {
+ for (int64_t index_id : drop_partition_log.index_ids()) {
+ std::string key = index_partition_key(index_id,
partition_id);
+ if (calculated_partitions_.contains(key)) {
+ continue;
+ }
+ ++num_total;
+ int64_t partition_data_size = 0;
+ if (get_index_partition_data_size(db_id, table_id,
index_id, partition_id,
+ &partition_data_size) !=
0) {
+ return -1;
+ }
+ ++num_calculated;
+ data_size += partition_data_size;
+ calculated_partitions_.insert(key);
+
drop_partition_log_copy.mutable_index_partition_to_data_size()->emplace(
+ key, partition_data_size);
+ }
+ }
+
+ if
(!drop_partition_log_copy.index_partition_to_data_size().empty()) {
+ operation_log.clear_drop_partition();
+
operation_log.mutable_drop_partition()->Swap(&drop_partition_log_copy);
+ if (save_operation_log(log_key, operation_log) != 0) {
+ return -1;
+ }
+ }
+ }
+
+ VLOG_DEBUG << "drop partition log data size: " << data_size << ", key"
<< hex(log_key)
+ << ", partition_num_total=" << num_total
+ << ", partition_num_calculated=" << num_calculated;
+ } else if (operation_log.has_drop_index()) {
+ auto& drop_index_log = operation_log.drop_index();
+ int64_t num_total = 0;
+ int64_t num_calculated = 0;
+
+ if (!drop_index_log.index_partition_to_data_size().empty()) {
+ for (auto& [key, size] :
drop_index_log.index_partition_to_data_size()) {
+ ++num_total;
+ data_size += size;
+ calculated_partitions_.insert(key);
+ }
+ } else {
+ int64_t db_id = drop_index_log.db_id();
+ int64_t table_id = drop_index_log.table_id();
+ DropIndexLogPB drop_index_log_copy = drop_index_log;
+
+ for (auto index_id : drop_index_log.index_ids()) {
+ std::vector<int64_t> partition_ids;
+ if (get_all_index_partitions(db_id, table_id, index_id,
&partition_ids) != 0) {
+ return -1;
+ }
+ for (int64_t partition_id : partition_ids) {
+ ++num_total;
+ std::string key = index_partition_key(index_id,
partition_id);
+ if (calculated_partitions_.contains(key)) {
+ continue;
+ }
+ int64_t partition_data_size = 0;
+ if (get_index_partition_data_size(db_id, table_id,
index_id, partition_id,
+ &partition_data_size) !=
0) {
+ return -1;
+ }
+ ++num_calculated;
+ data_size += partition_data_size;
+ calculated_partitions_.insert(key);
+
drop_index_log_copy.mutable_index_partition_to_data_size()->emplace(
+ key, partition_data_size);
+ }
+ }
+
+ if (!drop_index_log_copy.index_partition_to_data_size().empty()) {
+ operation_log.clear_drop_index();
+ operation_log.mutable_drop_index()->Swap(&drop_index_log_copy);
+ if (save_operation_log(log_key, operation_log) != 0) {
+ return -1;
+ }
+ }
+ }
+
+ VLOG_DEBUG << "drop index log data size: " << data_size << ", key" <<
hex(log_key)
+ << ", partition_num_total=" << num_total
+ << ", partition_num_calculated=" << num_calculated;
+ }
+ if (reference_info.referenced_by_snapshot) {
+ retained_data_size_[reference_info.referenced_snapshot_timestamp] +=
data_size;
+ } else {
+ instance_retained_data_size_ += data_size;
+ }
+ return 0;
+}
+
+int SnapshotDataSizeCalculator::save_snapshot_data_size_with_retry() {
+ for (int retry = 0; retry < MAX_RETRY_TIMES; retry++) {
+ if (retry > 0) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_INTERVAL_MS));
+ }
+
+ int res = save_snapshot_data_size();
+ if (res == -2) {
+ continue; // TXN_CONFLICT
+ }
+ return res;
+ }
+ return -1;
+}
+
+int SnapshotDataSizeCalculator::save_snapshot_data_size() {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create transaction for saving snapshot data
size")
+ .tag("error_code", err);
+ return -1;
+ }
+ bool need_commit = false;
+ int64_t left_data_size = instance_retained_data_size_;
+
+ // save data size to SnapshotPB
+ for (auto& [snapshot_versionstamp, retained_data_size] :
retained_data_size_) {
+ std::string snapshot_full_key =
versioned::snapshot_full_key({instance_id_});
+ std::string snapshot_key = encode_versioned_key(snapshot_full_key,
snapshot_versionstamp);
+ std::string snapshot_val;
+ err = txn->get(snapshot_key, &snapshot_val);
+
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ // impossible
+ LOG_WARNING("snapshot key not found").tag("key",
hex(snapshot_key));
+ continue;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get snapshot key").tag("key",
hex(snapshot_key)).tag("err", err);
+ return -1;
+ }
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(snapshot_val)) {
+ LOG_WARNING("failed to parse SnapshotPB")
+ .tag("instance_id", instance_id_)
+ .tag("snapshot_id", snapshot_versionstamp.to_string());
+ return -1;
+ }
+ int64_t billable_data_size = snapshot_pb.snapshot_meta_image_size();
+ if (retained_data_size >= snapshot_pb.snapshot_logical_data_size()) {
+ billable_data_size += snapshot_pb.snapshot_logical_data_size();
+ left_data_size += retained_data_size -
snapshot_pb.snapshot_logical_data_size();
+ } else if (retained_data_size + left_data_size <=
+ snapshot_pb.snapshot_logical_data_size()) {
+ billable_data_size += retained_data_size + left_data_size;
+ left_data_size = 0;
+ } else {
+ billable_data_size += snapshot_pb.snapshot_logical_data_size();
+ left_data_size =
+ retained_data_size + left_data_size -
snapshot_pb.snapshot_logical_data_size();
+ }
+
+ if (!snapshot_pb.has_snapshot_retained_data_size() ||
+ snapshot_pb.snapshot_retained_data_size() != retained_data_size ||
+ !snapshot_pb.has_snapshot_billable_data_size() ||
+ snapshot_pb.snapshot_billable_data_size() != billable_data_size) {
+ LOG_INFO("update snapshot data size")
+ .tag("instance_id", instance_id_)
+ .tag("snapshot_id", snapshot_versionstamp.to_string())
+ .tag("old_retained_data_size",
snapshot_pb.snapshot_retained_data_size())
+ .tag("new_retained_data_size", retained_data_size)
+ .tag("old_billable_data_size",
snapshot_pb.snapshot_billable_data_size())
+ .tag("new_billable_data_size", billable_data_size);
+ snapshot_pb.set_snapshot_retained_data_size(retained_data_size);
+ snapshot_pb.set_snapshot_billable_data_size(billable_data_size);
+
+ std::string updated_snapshot_val;
+ if (!snapshot_pb.SerializeToString(&updated_snapshot_val)) {
+ LOG_WARNING("failed to serialize updated SnapshotPB");
+ return -1;
+ }
+ txn->put(snapshot_key, updated_snapshot_val);
+ need_commit = true;
+ }
+ }
+
+ // save instance data size
+ std::string key = instance_key(instance_id_);
+ std::string instance_value;
+ err = txn->get(key, &instance_value);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get instance info")
+ .tag("instance_id", instance_id_)
+ .tag("error", err);
+ return -1;
+ }
+ InstanceInfoPB instance_info;
+ if (!instance_info.ParseFromString(instance_value)) {
+ LOG_WARNING("failed to parse instance info").tag("instance_id",
instance_id_);
+ return -1;
+ }
+ if (!instance_info.has_snapshot_retained_data_size() ||
+ instance_info.snapshot_retained_data_size() !=
instance_retained_data_size_ ||
+ !instance_info.has_snapshot_billable_data_size() ||
+ instance_info.snapshot_billable_data_size() != left_data_size) {
+
instance_info.set_snapshot_retained_data_size(instance_retained_data_size_);
+ instance_info.set_snapshot_billable_data_size(left_data_size);
+ txn->put(key, instance_info.SerializeAsString());
+ need_commit = true;
+ LOG_INFO("update instance snapshot data size")
+ .tag("instance_id", instance_id_)
+ .tag("old_retained_data_size",
instance_info.snapshot_retained_data_size())
+ .tag("new_retained_data_size", instance_retained_data_size_)
+ .tag("old_billable_data_size",
instance_info.snapshot_billable_data_size())
+ .tag("new_billable_data_size", left_data_size);
+ }
+
+ if (need_commit) {
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to commit kv txn for saving snapshot data
size").tag("err", err);
+ if (err == TxnErrorCode::TXN_CONFLICT) {
+ return -2;
+ }
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int SnapshotDataSizeCalculator::get_all_index_partitions(int64_t db_id,
int64_t table_id,
+ int64_t index_id,
+ std::vector<int64_t>*
partition_ids) {
+ partition_ids->clear();
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create transaction for getting all index
partitions")
+ .tag("error_code", err);
+ return -1;
+ }
+
+ std::string tablet_key_begin =
+ versioned::tablet_inverted_index_key({instance_id_, db_id,
table_id, index_id, 0, 0});
+ std::string tablet_key_end = versioned::tablet_inverted_index_key(
+ {instance_id_, db_id, table_id, index_id + 1, 0, 0});
+
+ int64_t last_partition_id = 0;
+ FullRangeGetOptions opts;
+ opts.snapshot = true;
+ opts.prefetch = true;
+ opts.txn_kv = txn_kv_;
+ auto it = txn_kv_->full_range_get(tablet_key_begin, tablet_key_end, opts);
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto&& [k, _] = *kvp;
+
+ int64_t decode_db_id = -1, decode_table_id = -1, decode_index_id = -1,
partition_id = -1,
+ tablet_id = -1;
+ std::string_view key_view(k);
+ if (!versioned::decode_tablet_inverted_index_key(&key_view,
&decode_db_id, &decode_table_id,
+ &decode_index_id,
&partition_id,
+ &tablet_id)) {
+ LOG_WARNING("failed to
decode_tablet_inverted_index_key").tag("key", hex(k));
+ return -1;
+ }
+ if (partition_id != last_partition_id) {
+ last_partition_id = partition_id;
+ partition_ids->emplace_back(partition_id);
+ }
+ }
+ if (!it->is_valid()) {
+ LOG_ERROR("failed to get all index partitions").tag("error_code",
it->error_code());
+ return -1;
+ }
+ return 0;
+}
+
+int SnapshotDataSizeCalculator::get_index_partition_data_size(int64_t db_id,
int64_t table_id,
+ int64_t index_id,
+ int64_t
partition_id,
+ int64_t*
data_size) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create transaction for getting index partition
data size")
+ .tag("error_code", err);
+ return -1;
+ }
+
+ // for a clone instance, the key ranges may be empty if snapshot chain is
not compacted
+ std::string tablet_key_begin = versioned::tablet_inverted_index_key(
+ {instance_id_, db_id, table_id, index_id, partition_id, 0});
+ std::string tablet_key_end = versioned::tablet_inverted_index_key(
+ {instance_id_, db_id, table_id, index_id, partition_id + 1, 0});
+ int64_t tablet_num = 0;
+ MetaReader reader(instance_id_);
+ FullRangeGetOptions opts;
+ opts.snapshot = true;
+ opts.prefetch = true;
+ opts.txn_kv = txn_kv_;
+ auto it = txn_kv_->full_range_get(tablet_key_begin, tablet_key_end, opts);
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto&& [k, _] = *kvp;
+
+ int64_t decode_db_id = -1, decode_table_id = -1, decode_index_id = -1,
+ decode_partition_id = -1, tablet_id = -1;
+ std::string_view key_view(k);
+ if (!versioned::decode_tablet_inverted_index_key(&key_view,
&decode_db_id, &decode_table_id,
+ &decode_index_id,
&decode_partition_id,
+ &tablet_id)) {
+ LOG_WARNING("failed to
decode_tablet_inverted_index_key=").tag("key", hex(k));
+ return -1;
+ }
+
+ TabletStatsPB tablet_stats;
+ err = reader.get_tablet_merged_stats(txn.get(), tablet_id,
&tablet_stats, nullptr);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get tablet merged stats")
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tablet_id)
+ .tag("error_code", err);
+ return -1;
+ }
+
+ *data_size += tablet_stats.data_size();
+ ++tablet_num;
+ }
+ if (!it->is_valid()) {
+ LOG_ERROR("failed to get index partition data size").tag("error_code",
it->error_code());
+ return -1;
+ }
+ VLOG_DEBUG << "get data size for instance_id=" << instance_id_ << "
db_id=" << db_id
+ << " table_id=" << table_id << " index_id=" << index_id
+ << " partition_id=" << partition_id << " tablet_num=" <<
tablet_num
+ << " data_size=" << *data_size;
+ return 0;
+}
+
+int SnapshotDataSizeCalculator::save_operation_log(const std::string_view&
log_key,
+ OperationLogPB&
operation_log) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create transaction for saving operation
log").tag("error_code", err);
+ return -1;
+ }
+ blob_put(txn.get(), log_key, operation_log, 0);
+ LOG_INFO("put operation log key")
+ .tag("instance_id", instance_id_)
+ .tag("operation_log_key", hex(log_key))
+ .tag("value", operation_log.DebugString());
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to commit transaction for saving operation
log").tag("error_code", err);
+ return -1;
+ }
+ return 0;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 75e7b1b4fea..0762231c3ab 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -29,6 +29,7 @@ set_target_properties(txn_kv_test PROPERTIES COMPILE_FLAGS
"-fno-access-control"
add_executable(recycler_test
recycler_test.cpp
recycler_operation_log_test.cpp
+ snapshot_data_size_calculator_test.cpp
recycle_versioned_keys_test.cpp)
add_executable(mem_txn_kv_test mem_txn_kv_test.cpp)
diff --git a/cloud/test/recycler_operation_log_test.cpp
b/cloud/test/recycler_operation_log_test.cpp
index 4718e58975a..8eb1cb6f4b2 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -2547,13 +2547,14 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck)
{
insert_empty_value();
// Test initialization without snapshots
+ OperationLogReferenceInfo reference_info;
{
InstanceInfoPB instance_info;
OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(),
instance_info);
ASSERT_EQ(checker.init(), 0);
// All logs should be recyclable when no snapshots exist
- ASSERT_TRUE(checker.can_recycle(old_version, 1)) <<
old_version.version();
+ ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info)) <<
old_version.version();
}
{
@@ -2563,7 +2564,7 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) {
ASSERT_EQ(checker.init(), 0);
OperationLogPB op_log;
- ASSERT_TRUE(checker.can_recycle(old_version, op_log.min_timestamp()));
+ ASSERT_TRUE(checker.can_recycle(old_version, op_log.min_timestamp(),
&reference_info));
}
auto write_snapshot = [&]() {
@@ -2591,19 +2592,19 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck)
{
ASSERT_EQ(checker.init(), 0);
// case 1, old operation log can be recycled.
- ASSERT_TRUE(checker.can_recycle(old_version, 1));
+ ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info));
// case 2. snapshot exist in the log range, can not be recycled.
- ASSERT_FALSE(checker.can_recycle(version1, old_version.version()))
+ ASSERT_FALSE(checker.can_recycle(version1, old_version.version(),
&reference_info))
<< "version1: " << version1.version() << ", old_version: " <<
old_version.version();
Versionstamp version3 = get_current_versionstamp();
Versionstamp version4(version3.version(), 1);
// case 3. large operation log can not be recycled.
- ASSERT_FALSE(checker.can_recycle(version4, version2.version()));
+ ASSERT_FALSE(checker.can_recycle(version4, version2.version(),
&reference_info));
// case 4: [min_version, operation log version)
- ASSERT_TRUE(checker.can_recycle(version1, version1.version()));
+ ASSERT_TRUE(checker.can_recycle(version1, version1.version(),
&reference_info));
}
{
@@ -2616,7 +2617,7 @@ TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) {
Versionstamp version5 = get_current_versionstamp();
- ASSERT_FALSE(checker.can_recycle(version5, version2.version()))
+ ASSERT_FALSE(checker.can_recycle(version5, version2.version(),
&reference_info))
<< "version5: " << version5.version() << ", version2: " <<
version2.version();
}
}
diff --git a/cloud/test/snapshot_data_size_calculator_test.cpp
b/cloud/test/snapshot_data_size_calculator_test.cpp
new file mode 100644
index 00000000000..dfe6d760c46
--- /dev/null
+++ b/cloud/test/snapshot_data_size_calculator_test.cpp
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/cloud.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/keys.h"
+#include "meta-store/mem_txn_kv.h"
+#include "meta-store/txn_kv.h"
+#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
+#include "recycler/recycler.h"
+
+using namespace doris::cloud;
+
+extern std::string instance_id;
+
+static void update_instance_info(TxnKv* txn_kv, const InstanceInfoPB&
instance) {
+ std::string key = instance_key({instance_id});
+ std::string value = instance.SerializeAsString();
+ ASSERT_FALSE(value.empty()) << "Failed to serialize instance info";
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+ txn->put(key, value);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit
transaction";
+}
+
+void get_instance(TxnKv* txn_kv, InstanceInfoPB& instance) {
+ std::string key = instance_key({instance_id});
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(instance.ParseFromString(val));
+}
+
+void update_snapshot(TxnKv* txn_kv, const Versionstamp& versionstamp, const
SnapshotPB& snapshot) {
+ std::string snapshot_full_key =
versioned::snapshot_full_key({instance_id});
+ std::string key = encode_versioned_key(snapshot_full_key, versionstamp);
+ std::string value = snapshot.SerializeAsString();
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+ txn->put(key, value);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit
transaction";
+}
+
+void get_snapshot(TxnKv* txn_kv, const Versionstamp& versionstamp, SnapshotPB&
snapshot) {
+ std::string snapshot_full_key =
versioned::snapshot_full_key({instance_id});
+ std::string key = encode_versioned_key(snapshot_full_key, versionstamp);
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(snapshot.ParseFromString(val));
+}
+
+TEST(SnapshotDataSizeCalculatorTest, SaveSnapshotDataSizeTest) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("0");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_empty");
+ update_instance_info(txn_kv.get(), instance);
+
+ std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
+
+ auto create_snapshot = [&](Versionstamp versionstamp, int64_t image_size,
+ int logical_data_size) {
+ SnapshotPB snapshot;
+ snapshot.set_snapshot_meta_image_size(image_size);
+ snapshot.set_snapshot_logical_data_size(logical_data_size);
+ snapshots.emplace_back(std::make_pair(snapshot, versionstamp));
+ update_snapshot(txn_kv.get(), versionstamp, snapshot);
+ };
+
+ Versionstamp vs1(1);
+ create_snapshot(vs1, 100, 1000);
+ Versionstamp vs2(2);
+ create_snapshot(vs2, 200, 2000);
+ Versionstamp vs3(3);
+ create_snapshot(vs3, 300, 3000);
+ Versionstamp vs4(4);
+ create_snapshot(vs4, 400, 4000);
+ Versionstamp vs5(5);
+ create_snapshot(vs5, 500, 1000);
+ Versionstamp vs6(6);
+ create_snapshot(vs6, 600, 8000);
+ Versionstamp vs7(7);
+ create_snapshot(vs7, 700, 8000);
+ Versionstamp vs8(8);
+ create_snapshot(vs8, 800, 9000);
+
+ SnapshotDataSizeCalculator calculator(instance_id, txn_kv);
+ calculator.init(snapshots);
+ std::map<Versionstamp, int64_t> retained_data_size;
+ retained_data_size[vs1] = 500;
+ retained_data_size[vs2] = 2000;
+ retained_data_size[vs3] = 4000;
+ retained_data_size[vs4] = 2000;
+ retained_data_size[vs5] = 5000;
+ retained_data_size[vs6] = 7000;
+ retained_data_size[vs7] = 2000;
+ retained_data_size[vs8] = 10000;
+ calculator.instance_retained_data_size_ = 10;
+ calculator.retained_data_size_ = std::move(retained_data_size);
+
+ ASSERT_EQ(calculator.save_snapshot_data_size_with_retry(), 0);
+
+ auto check_snapshot = [&](Versionstamp versionstamp, int64_t
expected_retained_data_size,
+ int expected_billable_data_size) {
+ SnapshotPB snapshot;
+ get_snapshot(txn_kv.get(), versionstamp, snapshot);
+ ASSERT_EQ(snapshot.snapshot_retained_data_size(),
expected_retained_data_size);
+ ASSERT_EQ(snapshot.snapshot_billable_data_size(),
expected_billable_data_size);
+ };
+
+ check_snapshot(vs1, 500, 610);
+ check_snapshot(vs2, 2000, 2200);
+ check_snapshot(vs3, 4000, 3300);
+ check_snapshot(vs4, 2000, 3400);
+ check_snapshot(vs5, 5000, 1500);
+ check_snapshot(vs6, 7000, 8600);
+ check_snapshot(vs7, 2000, 2000 + 700 + 3000);
+ check_snapshot(vs8, 10000, 9800);
+ get_instance(txn_kv.get(), instance);
+ ASSERT_EQ(instance.snapshot_retained_data_size(), 10);
+ ASSERT_EQ(instance.snapshot_billable_data_size(), 1000);
+}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index b93df8f52ec..ea5441cdc4a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -137,6 +137,9 @@ message InstanceInfoPB {
repeated KeySetType migrated_key_sets = 119; // convert single version
keys to multi version keys
repeated KeySetType compacted_key_sets = 120; // compact snapshot chain
...
+
+ optional int64 snapshot_retained_data_size = 121;
+ optional int64 snapshot_billable_data_size = 122;
}
message StagePB {
@@ -758,6 +761,8 @@ message DropPartitionLogPB {
repeated int64 partition_ids = 4;
optional int64 expired_at_s = 5; // expiration timestamp
optional bool update_table_version = 6;
+
+ map<string, int64> index_partition_to_data_size = 7;
}
message CommitIndexLogPB {
@@ -772,6 +777,8 @@ message DropIndexLogPB {
optional int64 table_id = 2;
repeated int64 index_ids = 3;
optional int64 expiration = 4; // expiration timestamp
+
+ map<string, int64> index_partition_to_data_size = 5;
}
message UpdateTabletLogPB {
@@ -849,8 +856,10 @@ message SnapshotPB {
optional string upload_file = 15;
optional string upload_id = 16;
- optional int64 image_file_size = 17;
- optional int64 snapshot_data_size = 18;
+ optional int64 snapshot_meta_image_size = 17;
+ optional int64 snapshot_logical_data_size = 18;
+ optional int64 snapshot_retained_data_size = 19;
+ optional int64 snapshot_billable_data_size = 20;
// TODO: add the referenced resources
}
@@ -2043,8 +2052,8 @@ message CommitSnapshotRequest {
optional string image_url = 3;
optional int64 last_journal_id = 4;
optional string request_ip = 5;
- optional int64 image_file_size = 6;
- optional int64 snapshot_data_size = 7;
+ optional int64 snapshot_meta_image_size = 6;
+ optional int64 snapshot_logical_data_size = 7;
}
message CommitSnapshotResponse {
@@ -2079,8 +2088,10 @@ message SnapshotInfoPB {
optional string reason = 14;
repeated string derived_instance_ids = 15;
- optional int64 image_file_size = 16;
- optional int64 snapshot_data_size = 17;
+ optional int64 snapshot_meta_image_size = 16;
+ optional int64 snapshot_logical_data_size = 17;
+ optional int64 snapshot_retained_data_size = 18;
+ optional int64 snapshot_billable_data_size = 19;
}
message ListSnapshotRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]