This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f0b8d2cb24e branch-3.1: [Feature](recycler) Add recycler metrics for
instance layer #51856 #51448 (#52429)
f0b8d2cb24e is described below
commit f0b8d2cb24eae8b7549f9347bf96311bc7678a2e
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Jun 28 22:54:40 2025 +0800
branch-3.1: [Feature](recycler) Add recycler metrics for instance layer
#51856 #51448 (#52429)
Cherry-picked #51856 #51448
---
cloud/src/common/bvars.cpp | 40 +-
cloud/src/common/bvars.h | 47 ++-
cloud/src/common/config.h | 4 +
cloud/src/main.cpp | 3 +-
cloud/src/recycler/recycler.cpp | 901 ++++++++++++++++++++++++++++++++++------
cloud/src/recycler/recycler.h | 134 +++++-
cloud/test/recycler_test.cpp | 46 +-
7 files changed, 996 insertions(+), 179 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 9fd777a4b98..d8d6d34127a 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -20,6 +20,7 @@
#include <bvar/multi_dimension.h>
#include <bvar/reducer.h>
#include <bvar/status.h>
+#include <bvar/window.h>
#include <cstdint>
#include <stdexcept>
@@ -103,14 +104,41 @@ BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_rowset_earlest_ts("recycler",
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler",
"recycle_tmp_rowset_earlest_ts");
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler",
"recycle_expired_txn_label_earlest_ts");
bvar::Status<int64_t>
g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0);
-bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
+// current concurrency of recycle task
+bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
// recycler's mbvars
-mBvarIntAdder
g_bvar_recycler_instance_running("recycler_instance_running",{"instance_id"});
-mBvarLongStatus
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration_ms",{"instance_id"});
-mBvarLongStatus
g_bvar_recycler_instance_next_time("recycler_instance_next_time_s",{"instance_id"});
-mBvarPairStatus<int64_t>
g_bvar_recycler_instance_recycle_times("recycler_instance_recycle_times",{"instance_id"});
-mBvarLongStatus
g_bvar_recycler_instance_recycle_last_success_times("recycler_instance_recycle_last_success_times",{"instance_id"});
+bvar::Adder<int64_t>
g_bvar_recycler_instance_running_counter("recycler_instance_running_counter");
+// cost time of the last whole recycle process
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"});
+// start and end timestamps of the recycle process
+mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_st_ts("recycler_instance_recycle_st_ts",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_ed_ts("recycler_instance_recycle_ed_ts",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_last_success_ts("recycler_instance_recycle_last_success_ts",{"instance_id"});
+
+// recycler's mbvars
+// instance_id: unique identifier for the instance
+// resource_type: type of resource need to be recycled (index, partition,
rowset, segment, tablet, etc.)
+// resource_id: unique identifier for the repository
+// status: status of the recycle task (normal, abnormal, etc.)
+mBvarIntAdder
g_bvar_recycler_vault_recycle_status("recycler_vault_recycle_status",
{"instance_id", "resource_id", "status"});
+// current concurrency of vault delete task
+mBvarIntAdder
g_bvar_recycler_vault_recycle_task_concurrency("recycler_vault_recycle_task_concurrency",
{"instance_id", "resource_type", "resource_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycled_num("recycler_instance_last_round_recycled_num",
{"instance_id", "resource_type"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_to_recycle_num("recycler_instance_last_round_to_recycle_num",
{"instance_id", "resource_type"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycled_bytes("recycler_instance_last_round_recycled_bytes",
{"instance_id", "resource_type"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_to_recycle_bytes("recycler_instance_last_round_to_recycle_bytes",
{"instance_id", "resource_type"});
+mBvarStatus<double>
g_bvar_recycler_instance_last_round_recycle_elpased_ts("recycler_instance_last_round_recycle_elpased_ts",
{"instance_id", "resource_type"});
+// total recycled num and bytes of resources since recycler started
+mBvarIntAdder
g_bvar_recycler_instance_recycle_total_num_since_started("recycler_instance_recycle_total_num_since_started",
{"instance_id", "resource_type"});
+mBvarIntAdder
g_bvar_recycler_instance_recycle_total_bytes_since_started("recycler_instance_recycle_total_bytes_since_started",
{"instance_id", "resource_type"});
+mBvarIntAdder
g_bvar_recycler_instance_recycle_round("recycler_instance_recycle_round",
{"instance_id", "resource_type"});
+// represents the ms required per resource to be recycled
+// value of -1 means no resource recycled
+mBvarStatus<double>
g_bvar_recycler_instance_recycle_time_per_resource("recycler_instance_recycle_time_per_resource",
{"instance_id", "resource_type"});
+// represents the bytes of resources that can be recycled per ms
+mBvarStatus<double>
g_bvar_recycler_instance_recycle_bytes_per_ms("recycler_instance_recycle_bytes_per_ms",
{"instance_id", "resource_type"});
// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 12f3f2c3060..9f4f3d1fccb 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -24,6 +24,7 @@
#include <bvar/reducer.h>
#include <cstdint>
+#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
@@ -139,8 +140,7 @@ public:
BvarType* stats =
counter_.get_stats(std::list<std::string>(dim_values));
if (stats) {
if constexpr (std::is_same_v<BvarType, bvar::Status<double>> ||
- std::is_same_v<BvarType, bvar::Status<long>> ||
- is_pair_status<BvarType>::value) {
+ std::is_same_v<BvarType, bvar::Status<long>>) {
stats->set_value(value);
} else {
*stats << value;
@@ -160,8 +160,6 @@ private:
template <typename T>
struct is_valid_bvar_type : std::false_type {};
template <typename T>
- struct is_pair_status : std::false_type {};
- template <typename T>
struct is_valid_bvar_type<bvar::Adder<T>> : std::true_type {};
template <>
struct is_valid_bvar_type<bvar::IntRecorder> : std::true_type {};
@@ -169,8 +167,6 @@ private:
struct is_valid_bvar_type<bvar::Maxer<T>> : std::true_type {};
template <typename T>
struct is_valid_bvar_type<bvar::Status<T>> : std::true_type {};
- template <typename T>
- struct is_pair_status<bvar::Status<std::pair<T, T>>> : std::true_type {};
template <>
struct is_valid_bvar_type<bvar::LatencyRecorder> : std::true_type {};
@@ -183,18 +179,8 @@ using mBvarIntRecorder = mBvarWrapper<bvar::IntRecorder>;
using mBvarLatencyRecorder = mBvarWrapper<bvar::LatencyRecorder>;
using mBvarIntMaxer = mBvarWrapper<bvar::Maxer<int>>;
using mBvarDoubleMaxer = mBvarWrapper<bvar::Maxer<double>>;
-using mBvarLongStatus = mBvarWrapper<bvar::Status<long>>;
-using mBvarDoubleStatus = mBvarWrapper<bvar::Status<double>>;
-
-namespace std {
-template <typename T1, typename T2>
-inline std::ostream& operator<<(std::ostream& os, const std::pair<T1, T2>& p) {
- return os << "{" << p.first << "," << p.second << "}";
-}
-} // namespace std
-
template <typename T>
-using mBvarPairStatus = mBvarWrapper<bvar::Status<std::pair<T, T>>>;
+using mBvarStatus = mBvarWrapper<bvar::Status<T>>;
// meta-service's bvars
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
@@ -270,13 +256,28 @@ extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts;
+// recycler's mbvars
extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
-extern bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
-extern mBvarIntAdder g_bvar_recycler_instance_running;
-extern mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration;
-extern mBvarLongStatus g_bvar_recycler_instance_next_time;
-extern mBvarPairStatus<int64_t> g_bvar_recycler_instance_recycle_times;
-extern mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times;
+extern bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
+extern bvar::Adder<int64_t> g_bvar_recycler_instance_running_counter;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_recycle_duration;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_next_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_st_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_ed_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_last_success_ts;
+
+extern mBvarIntAdder g_bvar_recycler_vault_recycle_status;
+extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_concurrency;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_to_recycle_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_bytes;
+extern mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_to_recycle_bytes;
+extern mBvarStatus<double>
g_bvar_recycler_instance_last_round_recycle_elpased_ts;
+extern mBvarIntAdder g_bvar_recycler_instance_recycle_total_num_since_started;
+extern mBvarIntAdder
g_bvar_recycler_instance_recycle_total_bytes_since_started;
+extern mBvarIntAdder g_bvar_recycler_instance_recycle_round;
+extern mBvarStatus<double> g_bvar_recycler_instance_recycle_time_per_resource;
+extern mBvarStatus<double> g_bvar_recycler_instance_recycle_bytes_per_ms;
// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index cc1094ee103..883af8f64da 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -47,6 +47,8 @@ CONF_Int64(fdb_txn_timeout_ms, "10000");
CONF_Int64(brpc_max_body_size, "3147483648");
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
+CONF_String(bvar_max_dump_multi_dimension_metric_num, "5000");
+
// logging
CONF_String(log_dir, "./log/");
CONF_String(log_level, "info"); // info warn error fatal
@@ -104,6 +106,8 @@
CONF_mInt64(delete_bitmap_storage_optimize_v2_check_skip_seconds, "300"); // 5mi
CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
// interval for check object
CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours
+// enable recycler metrics statistics
+CONF_Bool(enable_recycler_metrics, "false");
CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
diff --git a/cloud/src/main.cpp b/cloud/src/main.cpp
index 18cf98720e9..0aad97aab4d 100644
--- a/cloud/src/main.cpp
+++ b/cloud/src/main.cpp
@@ -236,7 +236,8 @@ int main(int argc, char** argv) {
std::cout << "try to start meta_service, recycler" << std::endl;
}
-
google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number",
"2000");
+ google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number",
+
config::bvar_max_dump_multi_dimension_metric_num.c_str());
brpc::Server server;
brpc::FLAGS_max_body_size = config::brpc_max_body_size;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 9c81b0364fd..b7238efbdc6 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -20,10 +20,12 @@
#include <brpc/builtin_service.pb.h>
#include <brpc/server.h>
#include <butil/endpoint.h>
+#include <butil/strings/string_split.h>
#include <bvar/status.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
+#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
@@ -65,6 +67,9 @@ namespace doris::cloud {
using namespace std::chrono;
+static RecyclerMetricsContext tablet_metrics_context_("global_recycler",
"recycle_tablet");
+static RecyclerMetricsContext segment_metrics_context_("global_recycler",
"recycle_segment");
+
// return 0 for success get a key, 1 for key not found, negative for error
[[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key,
std::string& val) {
std::unique_ptr<Transaction> txn;
@@ -280,12 +285,16 @@ void Recycler::recycle_callback() {
if (stopped()) return;
LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- g_bvar_recycler_task_concurrency << 1;
- g_bvar_recycler_instance_running.put({instance_id}, 1);
- g_bvar_recycler_instance_recycle_times.put({instance_id},
std::make_pair(ctime_ms, -1));
+ g_bvar_recycler_instance_recycle_task_concurrency << 1;
+ g_bvar_recycler_instance_running_counter << 1;
+ g_bvar_recycler_instance_recycle_st_ts.put({instance_id}, ctime_ms);
+ tablet_metrics_context_.reset();
+ segment_metrics_context_.reset();
ret = instance_recycler->do_recycle();
- g_bvar_recycler_task_concurrency << -1;
- g_bvar_recycler_instance_running.put({instance_id}, -1);
+ tablet_metrics_context_.finish_report();
+ segment_metrics_context_.finish_report();
+ g_bvar_recycler_instance_recycle_task_concurrency << -1;
+ g_bvar_recycler_instance_running_counter << -1;
// If instance recycler has been aborted, don't finish this job
if (!instance_recycler->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), recycle_job_key,
instance_id, ip_port_,
@@ -297,15 +306,15 @@ void Recycler::recycle_callback() {
}
auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
auto elpased_ms = now - ctime_ms;
- g_bvar_recycler_instance_recycle_times.put({instance_id},
std::make_pair(ctime_ms, now));
+ g_bvar_recycler_instance_recycle_ed_ts.put({instance_id}, now);
g_bvar_recycler_instance_last_recycle_duration.put({instance_id},
elpased_ms);
- g_bvar_recycler_instance_next_time.put({instance_id},
- now +
config::recycle_interval_seconds * 1000);
+ g_bvar_recycler_instance_next_ts.put({instance_id},
+ now +
config::recycle_interval_seconds * 1000);
LOG(INFO) << "recycle instance done, "
<< "instance_id=" << instance_id << " ret=" << ret << "
ctime_ms: " << ctime_ms
<< " now: " << now;
- g_bvar_recycler_instance_recycle_last_success_times.put({instance_id},
now);
+ g_bvar_recycler_instance_recycle_last_success_ts.put({instance_id},
now);
LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
@@ -806,6 +815,7 @@ int InstanceRecycler::recycle_indexes() {
int64_t num_scanned = 0;
int64_t num_expired = 0;
int64_t num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
@@ -823,7 +833,8 @@ int InstanceRecycler::recycle_indexes() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle indexes finished, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle indexes finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
@@ -906,19 +917,61 @@ int InstanceRecycler::recycle_indexes() {
return -1;
}
}
- if (recycle_tablets(index_pb.table_id(), index_id) != 0) {
+ if (recycle_tablets(index_pb.table_id(), index_id, metrics_context) !=
0) {
LOG_WARNING("failed to recycle tablets under index")
.tag("table_id", index_pb.table_id())
.tag("instance_id", instance_id_)
.tag("index_id", index_id);
return -1;
}
- ++num_recycled;
+ metrics_context.total_recycled_num = ++num_recycled;
+ metrics_context.report();
check_recycle_task(instance_id_, task_name, num_scanned, num_recycled,
start_time);
index_keys.push_back(k);
return 0;
};
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
+ RecycleIndexPB index_pb;
+ if (!index_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(index_pb)) {
+ return 0;
+ }
+ // decode index_id
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
+ auto index_id = std::get<int64_t>(std::get<0>(out[3]));
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string val;
+ err = txn->get(k, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ index_pb.Clear();
+ if (!index_pb.ParseFromString(val)) {
+ return 0;
+ }
+ if (scan_tablets_and_statistics(index_pb.table_id(), index_id,
metrics_context) != 0) {
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
auto loop_done = [&index_keys, this]() -> int {
if (index_keys.empty()) return 0;
DORIS_CLOUD_DEFER {
@@ -931,7 +984,9 @@ int InstanceRecycler::recycle_indexes() {
return 0;
};
- return scan_and_recycle(index_key0, index_key1, std::move(recycle_func),
std::move(loop_done));
+ return scan_for_recycle_and_statistics(index_key0, index_key1, "indexes",
metrics_context,
+ std::move(scan_and_statistics),
std::move(recycle_func),
+ std::move(loop_done));
}
bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string
instance_id,
@@ -1024,6 +1079,7 @@ int InstanceRecycler::recycle_partitions() {
int64_t num_scanned = 0;
int64_t num_expired = 0;
int64_t num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
@@ -1041,7 +1097,8 @@ int InstanceRecycler::recycle_partitions() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle partitions finished, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle partitions finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
@@ -1131,7 +1188,8 @@ int InstanceRecycler::recycle_partitions() {
int ret = 0;
for (int64_t index_id : part_pb.index_id()) {
- if (recycle_tablets(part_pb.table_id(), index_id, partition_id,
is_empty_tablet) != 0) {
+ if (recycle_tablets(part_pb.table_id(), index_id, metrics_context,
partition_id,
+ is_empty_tablet) != 0) {
LOG_WARNING("failed to recycle tablets under partition")
.tag("table_id", part_pb.table_id())
.tag("instance_id", instance_id_)
@@ -1148,7 +1206,56 @@ int InstanceRecycler::recycle_partitions() {
partition_version_keys.push_back(partition_version_key(
{instance_id_, part_pb.db_id(), part_pb.table_id(),
partition_id}));
}
+ metrics_context.report();
+ }
+ return ret;
+ };
+
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&, this](std::string_view k, std::string_view
v) -> int {
+ RecyclePartitionPB part_pb;
+ if (!part_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(part_pb)) {
+ return 0;
+ }
+ // decode partition_id
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ // 0x01 "recycle" ${instance_id} "partition" ${partition_id} ->
RecyclePartitionPB
+ auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
+ // Change state to RECYCLING
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string val;
+ err = txn->get(k, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ part_pb.Clear();
+ if (!part_pb.ParseFromString(val)) {
+ return 0;
+ }
+ // Partitions with PREPARED state MUST have no data
+ bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
+ int ret = 0;
+ for (int64_t index_id : part_pb.index_id()) {
+ if (scan_tablets_and_statistics(part_pb.table_id(), index_id,
metrics_context,
+ partition_id, is_empty_tablet) !=
0) {
+ ret = 0;
+ }
}
+ metrics_context.total_need_recycle_num++;
return ret;
};
@@ -1179,12 +1286,15 @@ int InstanceRecycler::recycle_partitions() {
return 0;
};
- return scan_and_recycle(part_key0, part_key1, std::move(recycle_func),
std::move(loop_done));
+ return scan_for_recycle_and_statistics(part_key0, part_key1, "partitions",
metrics_context,
+ std::move(scan_and_statistics),
std::move(recycle_func),
+ std::move(loop_done));
}
int InstanceRecycler::recycle_versions() {
int64_t num_scanned = 0;
int64_t num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_versions");
LOG_INFO("begin to recycle table and partition
versions").tag("instance_id", instance_id_);
@@ -1192,7 +1302,8 @@ int InstanceRecycler::recycle_versions() {
DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
- LOG_INFO("recycle table and partition versions finished, cost={}s",
cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle table and partition versions finished, cost={}s",
cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_recycled", num_recycled);
@@ -1202,8 +1313,8 @@ int InstanceRecycler::recycle_versions() {
auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0,
0});
int64_t last_scanned_table_id = 0;
bool is_recycled = false; // Is last scanned kv recycled
- auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id,
&is_recycled, this](
- std::string_view k, std::string_view) {
+ auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id,
&is_recycled,
+ &metrics_context, this](std::string_view k,
std::string_view) {
++num_scanned;
auto k1 = k;
k1.remove_prefix(1);
@@ -1264,15 +1375,58 @@ int InstanceRecycler::recycle_versions() {
if (err != TxnErrorCode::TXN_OK) {
return -1;
}
- ++num_recycled;
+ metrics_context.total_recycled_num = ++num_recycled;
+ metrics_context.report();
is_recycled = true;
return 0;
};
- return scan_and_recycle(version_key_begin, version_key_end,
std::move(recycle_func));
+ int64_t last_scanned_table_id_t = 0;
+ bool is_recycled_t = false; // Is last scanned kv recycled
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, &last_scanned_table_id_t,
&is_recycled_t, this](
+ std::string_view k, std::string_view) {
+ auto k1 = k;
+ k1.remove_prefix(1);
+ // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id}
${partition_id}
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ DCHECK_EQ(out.size(), 6) << k;
+ auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+ if (table_id == last_scanned_table_id_t) { // Already handle kvs of
this table
+ metrics_context.total_need_recycle_num +=
+ is_recycled_t; // Version kv of this table has been
recycled
+ return 0;
+ }
+ last_scanned_table_id_t = table_id;
+ is_recycled_t = false;
+ auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0,
0, 0});
+ auto tablet_key_end = stats_tablet_key({instance_id_, table_id,
INT64_MAX, 0, 0});
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::unique_ptr<RangeGetIterator> iter;
+ err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ if (iter->has_next()) { // Table is useful, should not recycle table
and partition versions
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ is_recycled_t = true;
+ return 0;
+ };
+
+ return scan_for_recycle_and_statistics(version_key_begin, version_key_end,
"versions",
+ metrics_context,
std::move(scan_and_statistics),
+ std::move(recycle_func));
}
-int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
int64_t partition_id,
+int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
+ RecyclerMetricsContext& metrics_context,
int64_t partition_id,
bool is_empty_tablet) {
int64_t num_scanned = 0;
std::atomic_long num_recycled = 0;
@@ -1351,8 +1505,8 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_,
tablet_id}));
if (!is_empty_tablet) {
sync_executor.add([this, &num_recycled, tid = tablet_id,
range_move = use_range_remove,
- k]() mutable -> TabletKeyPair {
- if (recycle_tablet(tid) != 0) {
+ &metrics_context, k]() mutable -> TabletKeyPair
{
+ if (recycle_tablet(tid, metrics_context) != 0) {
LOG_WARNING("failed to recycle tablet")
.tag("instance_id", instance_id_)
.tag("tablet_id", tid);
@@ -1536,18 +1690,20 @@ int InstanceRecycler::delete_rowset_data(const
doris::RowsetMetaCloudPB& rs_meta
return accessor->delete_files(file_paths);
}
-int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaCloudPB>& rowsets,
- RowsetRecyclingState type) {
+int InstanceRecycler::delete_rowset_data(
+ const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
RowsetRecyclingState type,
+ RecyclerMetricsContext& metrics_context) {
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
// (resource_id, tablet_id, rowset_id)
std::vector<std::tuple<std::string, int64_t, std::string>>
rowsets_delete_by_prefix;
+ bool is_formal_rowset = (type == RowsetRecyclingState::FORMAL_ROWSET);
- for (const auto& rs : rowsets) {
+ for (const auto& [_, rs] : rowsets) {
// we have to treat tmp rowset as "orphans" that may not related to
any existing tablets
// due to aborted schema change.
- if (type == RowsetRecyclingState::FORMAL_ROWSET) {
+ if (is_formal_rowset) {
std::lock_guard lock(recycled_tablets_mtx_);
if (recycled_tablets_.count(rs.tablet_id())) {
continue; // Rowset data has already been deleted
@@ -1672,7 +1828,42 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
return -1;
}
auto& accessor = accessor_map_[*rid];
- return accessor->delete_files(*paths);
+ int ret = accessor->delete_files(*paths);
+ if (!ret) {
+ // deduplication of different files with the same rowset id
+ // 020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.dat
+ //020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.idx
+ std::set<std::string> deleted_rowset_id;
+
+ std::for_each(
+ paths->begin(), paths->end(),
+ [&metrics_context, &rowsets, &deleted_rowset_id](const
std::string& path) {
+ std::vector<std::string> str;
+ butil::SplitString(path, '/', &str);
+ std::string rowset_id;
+ if (auto pos = str.back().find('_'); pos !=
std::string::npos) {
+ rowset_id = str.back().substr(0, pos);
+ } else {
+ LOG(WARNING) << "failed to parse rowset_id,
path=" << path;
+ return;
+ }
+ auto rs_meta = rowsets.find(rowset_id);
+ if (rs_meta != rowsets.end() &&
+ !deleted_rowset_id.contains(rowset_id)) {
+ deleted_rowset_id.emplace(rowset_id);
+ metrics_context.total_recycled_data_size +=
+ rs_meta->second.total_disk_size();
+ segment_metrics_context_.total_recycled_num +=
+ rs_meta->second.num_segments();
+
segment_metrics_context_.total_recycled_data_size +=
+ rs_meta->second.total_disk_size();
+ metrics_context.total_recycled_num++;
+ }
+ });
+ segment_metrics_context_.report();
+ metrics_context.report();
+ }
+ return ret;
});
}
for (const auto& [resource_id, tablet_id, rowset_id] :
rowsets_delete_by_prefix) {
@@ -1680,9 +1871,21 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
"delete rowset {} by prefix because it's in
BEGIN_PARTIAL_UPDATE state, "
"resource_id={}, tablet_id={}, instance_id={}",
rowset_id, resource_id, tablet_id, instance_id_);
- concurrent_delete_executor.add(
- [&]() -> int { return delete_rowset_data(resource_id,
tablet_id, rowset_id); });
+ concurrent_delete_executor.add([&]() -> int {
+ int ret = delete_rowset_data(resource_id, tablet_id, rowset_id);
+ if (!ret) {
+ auto rs = rowsets.at(rowset_id);
+ metrics_context.total_recycled_data_size +=
rs.total_disk_size();
+ metrics_context.total_recycled_num++;
+ segment_metrics_context_.total_recycled_data_size +=
rs.total_disk_size();
+ segment_metrics_context_.total_recycled_num +=
rs.num_segments();
+ metrics_context.report();
+ segment_metrics_context_.report();
+ }
+ return ret;
+ });
}
+
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
@@ -1710,7 +1913,122 @@ int InstanceRecycler::delete_rowset_data(const
std::string& resource_id, int64_t
return accessor->delete_prefix(rowset_path_prefix(tablet_id, rowset_id));
}
-int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
+int InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t
index_id,
+ RecyclerMetricsContext&
metrics_context,
+ int64_t partition_id, bool
is_empty_tablet) {
+ std::string tablet_key_begin, tablet_key_end;
+
+ if (partition_id > 0) {
+ meta_tablet_key({instance_id_, table_id, index_id, partition_id, 0},
&tablet_key_begin);
+ meta_tablet_key({instance_id_, table_id, index_id, partition_id + 1,
0}, &tablet_key_end);
+ } else {
+ meta_tablet_key({instance_id_, table_id, index_id, 0, 0},
&tablet_key_begin);
+ meta_tablet_key({instance_id_, table_id, index_id + 1, 0, 0},
&tablet_key_end);
+ }
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&, is_empty_tablet, this](std::string_view k,
+ std::string_view v)
-> int {
+ doris::TabletMetaCloudPB tablet_meta_pb;
+ if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t tablet_id = tablet_meta_pb.tablet_id();
+
+ if (!check_lazy_txn_finished(txn_kv_, instance_id_,
tablet_meta_pb.tablet_id())) {
+ return 0;
+ }
+
+ if (!is_empty_tablet) {
+ if (scan_tablet_and_statistics(tablet_id, metrics_context) != 0) {
+ return 0;
+ }
+ tablet_metrics_context_.total_need_recycle_num++;
+ }
+ return 0;
+ };
+ return scan_and_recycle(tablet_key_begin, tablet_key_end,
std::move(scan_and_statistics));
+}
+
+int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id,
+ RecyclerMetricsContext&
metrics_context) {
+ int ret = 0;
+ std::map<std::string, RowsetMetaCloudPB> rowset_meta_map;
+ std::unique_ptr<Transaction> txn;
+ if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle tablet ")
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ ret = -1;
+ }
+ GetRowsetResponse resp;
+ std::string msg;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // get rowsets in tablet
+ internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1,
instance_id_,
+ tablet_id, code, msg, &resp);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("failed to get rowsets of tablet when recycle tablet")
+ .tag("tablet id", tablet_id)
+ .tag("msg", msg)
+ .tag("code", code)
+ .tag("instance id", instance_id_);
+ ret = -1;
+ }
+ for (const auto& rs_meta : resp.rowset_meta()) {
+ /*
+ * For compatibility, we skip the loop for [0-1] here.
+ * The purpose of this loop is to delete object files,
+ * and since [0-1] only has meta and doesn't have object files,
+ * skipping it doesn't affect system correctness.
+ *
+ * If not skipped, the check "if (!rs_meta.has_resource_id())" below
+ * would return error -1 directly, causing the recycle operation to
fail.
+ *
+ * [0-1] doesn't have resource id is a bug.
+ * In the future, we will fix this problem, after that,
+ * we can remove this if statement.
+ *
+ * TODO(Yukang-Lian): remove this if statement when [0-1] has resource
id in the future.
+ */
+
+ if (rs_meta.end_version() == 1) {
+ // Assert that [0-1] has no resource_id to make sure
+ // this if statement will not be forgetted to remove
+ // when the resource id bug is fixed
+ DCHECK(!rs_meta.has_resource_id()) << "rs_meta" <<
rs_meta.ShortDebugString();
+ continue;
+ }
+ if (!rs_meta.has_resource_id()) {
+ LOG_WARNING("rowset meta does not have a resource id, impossible!")
+ .tag("rs_meta", rs_meta.ShortDebugString())
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tablet_id);
+ continue;
+ }
+ DCHECK(rs_meta.has_resource_id()) << "rs_meta" <<
rs_meta.ShortDebugString();
+ auto it = accessor_map_.find(rs_meta.resource_id());
+ // possible if the accessor is not initilized correctly
+ if (it == accessor_map_.end()) [[unlikely]] {
+ LOG_WARNING(
+ "failed to find resource id when recycle tablet, skip this
vault accessor "
+ "recycle process")
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("resource_id", rs_meta.resource_id())
+ .tag("rowset meta pb", rs_meta.ShortDebugString());
+ continue;
+ }
+
+ metrics_context.total_need_recycle_data_size +=
rs_meta.total_disk_size();
+ tablet_metrics_context_.total_need_recycle_data_size +=
rs_meta.total_disk_size();
+ segment_metrics_context_.total_need_recycle_data_size +=
rs_meta.total_disk_size();
+ segment_metrics_context_.total_need_recycle_num +=
rs_meta.num_segments();
+ }
+ return ret;
+}
+
+int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext& metrics_context) {
LOG_INFO("begin to recycle rowsets in a dropped tablet")
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
@@ -1726,7 +2044,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id,
""});
std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id +
1, ""});
- std::set<std::string> resource_ids;
+ std::vector<std::string> rowset_meta;
int64_t recycle_rowsets_number = 0;
int64_t recycle_segments_number = 0;
int64_t recycle_rowsets_data_size = 0;
@@ -1832,16 +2150,17 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id)
{
max_rowset_creation_time = std::max(max_rowset_creation_time,
rs_meta.creation_time());
min_rowset_expiration_time = std::min(min_rowset_expiration_time,
rs_meta.txn_expiration());
max_rowset_expiration_time = std::max(max_rowset_expiration_time,
rs_meta.txn_expiration());
- resource_ids.emplace(rs_meta.resource_id());
+ rowset_meta.emplace_back(rs_meta.resource_id());
+ LOG(INFO) << "rs_meta.resource_id()=" << rs_meta.resource_id();
}
LOG_INFO("recycle tablet start to delete object")
.tag("instance id", instance_id_)
.tag("tablet id", tablet_id)
.tag("recycle tablet resource ids are",
- std::accumulate(resource_ids.begin(), resource_ids.end(),
std::string(),
- [](const std::string& a, const std::string&
b) {
- return a.empty() ? b : a + "," + b;
+ std::accumulate(rowset_meta.begin(), rowset_meta.begin(),
std::string(),
+ [](std::string acc, const auto& it) {
+ return acc.empty() ? it : acc + ", " + it;
}));
SyncExecutor<int> concurrent_delete_executor(
@@ -1853,13 +2172,24 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id)
{
// ATTN: there may be data leak if not all accessor initilized successfully
// partial data deleted if the tablet is stored cross-storage vault
// vault id is not attached to TabletMeta...
- for (const auto& resource_id : resource_ids) {
- concurrent_delete_executor.add([&, accessor_ptr =
accessor_map_[resource_id]]() {
- if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id))
!= 0) {
+ for (const auto& resource_id : rowset_meta) {
+ concurrent_delete_executor.add([&, rs_id = resource_id,
+ accessor_ptr =
accessor_map_[resource_id]]() {
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ g_bvar_recycler_vault_recycle_task_concurrency.put(
+ {instance_id_, metrics_context.operation_type, rs_id},
-1);
+ metrics_context.report();
+ });
+ g_bvar_recycler_vault_recycle_task_concurrency.put(
+ {instance_id_, metrics_context.operation_type, rs_id}, 1);
+ int res =
accessor_ptr->delete_directory(tablet_path_prefix(tablet_id));
+ if (res != 0) {
LOG(WARNING) << "failed to delete rowset data of tablet " <<
tablet_id
<< " path=" << accessor_ptr->uri();
+ g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id,
"abnormal"}, 1);
return -1;
}
+ g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id,
"normal"}, 1);
return 0;
});
}
@@ -1883,6 +2213,15 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
return ret;
}
+ tablet_metrics_context_.total_recycled_data_size +=
+ recycle_rowsets_data_size + recycle_rowsets_index_size;
+ tablet_metrics_context_.total_recycled_num += 1;
+ segment_metrics_context_.total_recycled_num += recycle_segments_number;
+ segment_metrics_context_.total_recycled_data_size +=
+ recycle_rowsets_data_size + recycle_rowsets_index_size;
+ tablet_metrics_context_.report();
+ segment_metrics_context_.report();
+
txn.reset();
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to recycle tablet ")
@@ -1928,6 +2267,7 @@ int InstanceRecycler::recycle_rowsets() {
size_t total_rowset_value_size = 0;
size_t expired_rowset_size = 0;
std::atomic_long num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
@@ -1945,7 +2285,8 @@ int InstanceRecycler::recycle_rowsets() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle rowsets finished, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle rowsets finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
@@ -1959,7 +2300,9 @@ int InstanceRecycler::recycle_rowsets() {
};
std::vector<std::string> rowset_keys;
- std::vector<doris::RowsetMetaCloudPB> rowsets;
+ // rowset_id -> rowset_meta
+ // store rowset id and meta for statistics rs size when delete
+ std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
// Store keys of rowset recycled by background workers
std::mutex async_recycled_rowset_keys_mutex;
@@ -2026,6 +2369,43 @@ int InstanceRecycler::recycle_rowsets() {
return final_expiration;
};
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
+ RecycleRowsetPB rowset;
+ if (!rowset.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(rowset)) { // not expired
+ return 0;
+ }
+ if (!rowset.has_type()) {
+ if (!rowset.has_resource_id()) [[unlikely]] {
+ return 0;
+ }
+ if (rowset.resource_id().empty()) [[unlikely]] {
+ return 0;
+ }
+ return 0;
+ }
+ auto* rowset_meta = rowset.mutable_rowset_meta();
+ if (!rowset_meta->has_resource_id()) [[unlikely]] {
+ if (rowset.type() == RecycleRowsetPB::PREPARE ||
rowset_meta->num_segments() != 0) {
+ return 0;
+ }
+ }
+ if (rowset.type() != RecycleRowsetPB::PREPARE) {
+ if (rowset_meta->num_segments() > 0) {
+ metrics_context.total_need_recycle_num++;
+ segment_metrics_context_.total_need_recycle_num +=
rowset_meta->num_segments();
+ segment_metrics_context_.total_need_recycle_data_size +=
+ rowset_meta->total_disk_size();
+ metrics_context.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
+ }
+ }
+ return 0;
+ };
+
auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int
{
++num_scanned;
total_rowset_key_size += k.size();
@@ -2104,7 +2484,7 @@ int InstanceRecycler::recycle_rowsets() {
num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
rowset_keys.emplace_back(k);
if (rowset_meta->num_segments() > 0) { // Skip empty rowset
- rowsets.push_back(std::move(*rowset_meta));
+ rowsets.emplace(rowset_meta->rowset_id_v2(),
std::move(*rowset_meta));
} else {
++num_empty_rowset;
}
@@ -2114,12 +2494,15 @@ int InstanceRecycler::recycle_rowsets() {
auto loop_done = [&]() -> int {
std::vector<std::string> rowset_keys_to_delete;
- std::vector<doris::RowsetMetaCloudPB> rowsets_to_delete;
+ // rowset_id -> rowset_meta
+ // store rowset id and meta for statistics rs size when delete
+ std::map<std::string, doris::RowsetMetaCloudPB> rowsets_to_delete;
rowset_keys_to_delete.swap(rowset_keys);
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete =
std::move(rowset_keys_to_delete),
rowsets_to_delete =
std::move(rowsets_to_delete)]() {
- if (delete_rowset_data(rowsets_to_delete,
RowsetRecyclingState::FORMAL_ROWSET) != 0) {
+ if (delete_rowset_data(rowsets_to_delete,
RowsetRecyclingState::FORMAL_ROWSET,
+ metrics_context) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id="
<< instance_id_;
return;
}
@@ -2132,8 +2515,9 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};
- int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv),
- std::move(loop_done));
+ int ret = scan_for_recycle_and_statistics(recyc_rs_key0, recyc_rs_key1,
"rowsets",
+ metrics_context,
std::move(scan_and_statistics),
+ std::move(handle_rowset_kv),
std::move(loop_done));
worker_pool->stop();
if (!async_recycled_rowset_keys.empty()) {
@@ -2239,6 +2623,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
size_t expired_rowset_size = 0;
size_t total_rowset_key_size = 0;
size_t total_rowset_value_size = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
@@ -2256,7 +2641,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle tmp rowsets finished, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle tmp rowsets finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
@@ -2268,7 +2654,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// Elements in `tmp_rowset_keys` has the same lifetime as `it`
std::vector<std::string_view> tmp_rowset_keys;
- std::vector<doris::RowsetMetaCloudPB> tmp_rowsets;
+ // rowset_id -> rowset_meta
+ // store tmp_rowset id and meta for statistics rs size when delete
+ std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets;
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB&
rowset) {
@@ -2342,17 +2730,54 @@ int InstanceRecycler::recycle_tmp_rowsets() {
tmp_rowset_keys.push_back(k);
if (rowset.num_segments() > 0) { // Skip empty rowset
- tmp_rowsets.push_back(std::move(rowset));
+ tmp_rowsets.emplace(rowset.rowset_id_v2(), std::move(rowset));
}
return 0;
};
- auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, this]()
-> int {
- DORIS_CLOUD_DEFER {
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
+ doris::RowsetMetaCloudPB rowset;
+ if (!rowset.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t expiration = calc_expiration(rowset);
+ int64_t current_time = ::time(nullptr);
+ if (current_time < expiration) {
+ return 0;
+ }
+
+ DCHECK_GT(rowset.txn_id(), 0)
+ << "txn_id=" << rowset.txn_id() << " rowset=" <<
rowset.ShortDebugString();
+ if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+ return 0;
+ }
+
+ if (!rowset.has_resource_id()) {
+ if (rowset.num_segments() > 0) [[unlikely]] { // impossible
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ }
+
+ metrics_context.total_need_recycle_num++;
+ if (rowset.num_segments() > 0) {
+ metrics_context.total_need_recycle_data_size +=
rowset.total_disk_size();
+ segment_metrics_context_.total_need_recycle_data_size +=
rowset.total_disk_size();
+ segment_metrics_context_.total_need_recycle_num +=
rowset.num_segments();
+ }
+ return 0;
+ };
+
+ auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled,
&metrics_context,
+ this]() -> int {
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
- };
- if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET)
!= 0) {
+ });
+ if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET,
metrics_context) !=
+ 0) {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id="
<< instance_id_;
return -1;
}
@@ -2364,8 +2789,30 @@ int InstanceRecycler::recycle_tmp_rowsets() {
return 0;
};
- return scan_and_recycle(tmp_rs_key0, tmp_rs_key1,
std::move(handle_rowset_kv),
- std::move(loop_done));
+ return scan_for_recycle_and_statistics(tmp_rs_key0, tmp_rs_key1,
"tmp_rowsets", metrics_context,
+ std::move(scan_and_statistics),
+ std::move(handle_rowset_kv),
std::move(loop_done));
+}
+
+int InstanceRecycler::scan_for_recycle_and_statistics(
+ std::string begin, std::string_view end, std::string task_name,
+ RecyclerMetricsContext& metrics_context,
+ std::function<int(std::string_view k, std::string_view v)>
statistics_func,
+ std::function<int(std::string_view k, std::string_view v)>
recycle_func,
+ std::function<int()> loop_done) {
+ if (config::enable_recycler_metrics) {
+ scan_and_recycle(begin, end, std::move(statistics_func));
+
+ // report to bvar
+ metrics_context.report(true);
+ tablet_metrics_context_.report(true);
+ segment_metrics_context_.report(true);
+
+ int ret = scan_and_recycle(begin, end, std::move(recycle_func),
std::move(loop_done));
+ return ret;
+ } else {
+ return scan_and_recycle(begin, end, std::move(recycle_func),
std::move(loop_done));
+ }
}
int InstanceRecycler::scan_and_recycle(
@@ -2433,6 +2880,7 @@ int InstanceRecycler::abort_timeout_txn() {
int64_t num_timeout = 0;
int64_t num_abort = 0;
int64_t num_advance = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
@@ -2450,7 +2898,8 @@ int InstanceRecycler::abort_timeout_txn() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("end to abort timeout txn, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("end to abort timeout txn, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_timeout", num_timeout)
@@ -2462,7 +2911,7 @@ int InstanceRecycler::abort_timeout_txn() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
auto handle_txn_running_kv = [&num_scanned, &num_timeout, &num_abort,
&num_advance,
- ¤t_time,
+ ¤t_time, &metrics_context,
this](std::string_view k, std::string_view
v) -> int {
++num_scanned;
@@ -2557,14 +3006,57 @@ int InstanceRecycler::abort_timeout_txn() {
.tag("txn_id", txn_id);
return -1;
}
- ++num_abort;
+ metrics_context.total_recycled_num = ++num_abort;
+ metrics_context.report();
}
return 0;
};
- return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
- std::move(handle_txn_running_kv));
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, ¤t_time,
this](std::string_view k,
+
std::string_view v) -> int {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string_view k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ if (decode_key(&k1, &out) != 0) {
+ return 0;
+ }
+ int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+ int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ // Update txn_info
+ std::string txn_inf_key, txn_inf_val;
+ txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
+ err = txn->get(txn_inf_key, &txn_inf_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(txn_inf_val)) {
+ return 0;
+ }
+
+ if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
+ TxnRunningPB txn_running_pb;
+ if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ if (!config::force_immediate_recycle &&
txn_running_pb.timeout_time() > current_time) {
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ }
+ return 0;
+ };
+
+ return scan_for_recycle_and_statistics(
+ begin_txn_running_key, end_txn_running_key, "abort_timeout_txns",
metrics_context,
+ std::move(scan_and_statistics), std::move(handle_txn_running_kv));
}
int InstanceRecycler::recycle_expired_txn_label() {
@@ -2572,6 +3064,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
int64_t num_scanned = 0;
int64_t num_expired = 0;
int64_t num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
int ret = 0;
RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
@@ -2590,7 +3083,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("end to recycle expired txn, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("end to recycle expired txn, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
@@ -2633,6 +3127,20 @@ int InstanceRecycler::recycle_expired_txn_label() {
return 0;
};
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
+ RecycleTxnPB recycle_txn_pb;
+ if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ if ((config::force_immediate_recycle) ||
+ (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+ (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+ metrics_context.total_need_recycle_num++;
+ }
+ return 0;
+ };
+
auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
@@ -2712,7 +3220,9 @@ int InstanceRecycler::recycle_expired_txn_label() {
LOG(WARNING) << "failed to delete expired txn, err=" << err << "
key=" << hex(k);
return -1;
}
- ++num_recycled;
+ metrics_context.total_recycled_num = ++num_recycled;
+ metrics_context.report();
+
LOG(INFO) << "recycle expired txn, key=" << hex(k);
return 0;
};
@@ -2757,8 +3267,9 @@ int InstanceRecycler::recycle_expired_txn_label() {
return ret;
};
- return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
- std::move(handle_recycle_txn_kv),
std::move(loop_done));
+ return scan_for_recycle_and_statistics(
+ begin_recycle_txn_key, end_recycle_txn_key, "expired_txn_labels",
metrics_context,
+ std::move(scan_and_statistics), std::move(handle_recycle_txn_kv),
std::move(loop_done));
}
struct CopyJobIdTuple {
@@ -2880,6 +3391,7 @@ int InstanceRecycler::recycle_copy_jobs() {
// Used for INTERNAL stage's copy jobs to tag each batch for log trace
uint64_t batch_count = 0;
const std::string task_name = "recycle_copy_jobs";
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
LOG_INFO("begin to recycle copy jobs").tag("instance_id", instance_id_);
@@ -2890,7 +3402,8 @@ int InstanceRecycler::recycle_copy_jobs() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle copy jobs finished, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle copy jobs finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_finished", num_finished)
@@ -2906,7 +3419,7 @@ int InstanceRecycler::recycle_copy_jobs() {
copy_job_key(key_info1, &key1);
std::unordered_map<std::string, std::shared_ptr<BatchObjStoreAccessor>>
stage_accessor_map;
auto recycle_func = [&start_time, &num_scanned, &num_finished,
&num_expired, &num_recycled,
- &batch_count, &stage_accessor_map, &task_name,
+ &batch_count, &stage_accessor_map, &task_name,
&metrics_context,
this](std::string_view k, std::string_view v) -> int {
++num_scanned;
CopyJobPB copy_job;
@@ -3013,12 +3526,51 @@ int InstanceRecycler::recycle_copy_jobs() {
return -1;
}
- ++num_recycled;
+ metrics_context.total_recycled_num = ++num_recycled;
+ metrics_context.report();
check_recycle_task(instance_id_, task_name, num_scanned, num_recycled,
start_time);
return 0;
};
- return scan_and_recycle(key0, key1, std::move(recycle_func));
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context](std::string_view k,
std::string_view v) -> int {
+ CopyJobPB copy_job;
+ if (!copy_job.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed copy job").tag("key", hex(k));
+ return 0;
+ }
+
+ if (copy_job.job_status() == CopyJobPB::FINISH) {
+ if (copy_job.stage_type() == StagePB::EXTERNAL) {
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (copy_job.finish_time_ms() > 0) {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.finish_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
+ return 0;
+ }
+ } else {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.start_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
+ return 0;
+ }
+ }
+ }
+ } else if (copy_job.job_status() == CopyJobPB::LOADING) {
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (!config::force_immediate_recycle && current_time <=
copy_job.timeout_time_ms()) {
+ return 0;
+ }
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
+ return scan_for_recycle_and_statistics(key0, key1, "copy_jobs",
metrics_context,
+ std::move(scan_and_statistics),
std::move(recycle_func));
}
int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id,
@@ -3120,6 +3672,7 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
int InstanceRecycler::recycle_stage() {
int64_t num_scanned = 0;
int64_t num_recycled = 0;
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
const std::string task_name = "recycle_stage";
LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_);
@@ -3131,7 +3684,8 @@ int InstanceRecycler::recycle_stage() {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle stage, cost={}s", cost)
+ metrics_context.finish_report();
+ LOG_WARNING("recycle stage, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_recycled", num_recycled);
@@ -3143,8 +3697,8 @@ int InstanceRecycler::recycle_stage() {
std::string key1 = recycle_stage_key(key_info1);
std::vector<std::string_view> stage_keys;
- auto recycle_func = [&start_time, &num_scanned, &num_recycled,
&stage_keys, this](
- std::string_view k, std::string_view v) -> int
{
+ auto recycle_func = [&start_time, &num_scanned, &num_recycled,
&stage_keys, &metrics_context,
+ this](std::string_view k, std::string_view v) -> int {
++num_scanned;
RecycleStagePB recycle_stage;
if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
@@ -3199,12 +3753,58 @@ int InstanceRecycler::recycle_stage() {
<< ", ret=" << ret;
return -1;
}
- ++num_recycled;
+ metrics_context.total_recycled_num = ++num_recycled;
+ metrics_context.report();
check_recycle_task(instance_id_, "recycle_stage", num_scanned,
num_recycled, start_time);
stage_keys.push_back(k);
return 0;
};
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, this](std::string_view k,
+ std::string_view v) ->
int {
+ RecycleStagePB recycle_stage;
+ if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle stage").tag("key", hex(k));
+ return 0;
+ }
+
+ int idx = stoi(recycle_stage.stage().obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ LOG(WARNING) << "invalid idx: " << idx;
+ return 0;
+ }
+
+ std::shared_ptr<StorageVaultAccessor> accessor;
+ int ret = SYNC_POINT_HOOK_RETURN_VALUE(
+ [&] {
+ auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ return 0;
+ }
+
+ s3_conf->prefix =
recycle_stage.stage().obj_info().prefix();
+ std::shared_ptr<S3Accessor> s3_accessor;
+ int ret = S3Accessor::create(std::move(s3_conf.value()),
&s3_accessor);
+ if (ret != 0) {
+ return 0;
+ }
+
+ accessor = std::move(s3_accessor);
+ return 0;
+ }(),
+ "recycle_stage:get_accessor", &accessor);
+
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init accessor ret=" << ret;
+ return 0;
+ }
+
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
auto loop_done = [&stage_keys, this]() -> int {
if (stage_keys.empty()) return 0;
DORIS_CLOUD_DEFER {
@@ -3216,75 +3816,128 @@ int InstanceRecycler::recycle_stage() {
}
return 0;
};
-
- return scan_and_recycle(key0, key1, std::move(recycle_func),
std::move(loop_done));
+ return scan_for_recycle_and_statistics(key0, key1, "stages",
metrics_context,
+ std::move(scan_and_statistics),
std::move(recycle_func),
+ std::move(loop_done));
}
int InstanceRecycler::recycle_expired_stage_objects() {
LOG_INFO("begin to recycle expired stage objects").tag("instance_id",
instance_id_);
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ RecyclerMetricsContext metrics_context(instance_id_,
"recycle_expired_stage_objects");
DORIS_CLOUD_DEFER {
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
- LOG_INFO("recycle expired stage objects, cost={}s",
cost).tag("instance_id", instance_id_);
+ metrics_context.finish_report();
+ LOG_WARNING("recycle expired stage objects, cost={}s", cost)
+ .tag("instance_id", instance_id_);
};
+
int ret = 0;
- for (const auto& stage : instance_info_.stages()) {
- std::stringstream ss;
- ss << "instance_id=" << instance_id_ << ", stage_id=" <<
stage.stage_id() << ", user_name="
- << (stage.mysql_user_name().empty() ? "null" :
stage.mysql_user_name().at(0))
- << ", user_id=" << (stage.mysql_user_id().empty() ? "null" :
stage.mysql_user_id().at(0))
- << ", prefix=" << stage.obj_info().prefix();
-
- if (stopped()) break;
- if (stage.type() == StagePB::EXTERNAL) {
- continue;
- }
- int idx = stoi(stage.obj_info().id());
- if (idx > instance_info_.obj_info().size() || idx < 1) {
- LOG(WARNING) << "invalid idx: " << idx << ", id: " <<
stage.obj_info().id();
- continue;
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, this]() {
+ for (const auto& stage : instance_info_.stages()) {
+ if (stopped()) {
+ break;
+ }
+ if (stage.type() == StagePB::EXTERNAL) {
+ continue;
+ }
+ int idx = stoi(stage.obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ continue;
+ }
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ continue;
+ }
+ s3_conf->prefix = stage.obj_info().prefix();
+ std::shared_ptr<S3Accessor> accessor;
+ int ret1 = S3Accessor::create(*s3_conf, &accessor);
+ if (ret1 != 0) {
+ continue;
+ }
+ if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+ continue;
+ }
+ metrics_context.total_need_recycle_num++;
}
+ };
- const auto& old_obj = instance_info_.obj_info()[idx - 1];
- auto s3_conf = S3Conf::from_obj_store_info(old_obj);
- if (!s3_conf) {
- LOG(WARNING) << "failed to init s3_conf with obj_info=" <<
old_obj.ShortDebugString();
- continue;
- }
+ auto handle_recycle_func = [&, this]() {
+ for (const auto& stage : instance_info_.stages()) {
+ std::stringstream ss;
+ ss << "instance_id=" << instance_id_ << ", stage_id=" <<
stage.stage_id()
+ << ", user_name="
+ << (stage.mysql_user_name().empty() ? "null" :
stage.mysql_user_name().at(0))
+ << ", user_id="
+ << (stage.mysql_user_id().empty() ? "null" :
stage.mysql_user_id().at(0))
+ << ", prefix=" << stage.obj_info().prefix();
- s3_conf->prefix = stage.obj_info().prefix();
- std::shared_ptr<S3Accessor> accessor;
- int ret1 = S3Accessor::create(*s3_conf, &accessor);
- if (ret1 != 0) {
- LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " "
<< ss.str();
- ret = -1;
- continue;
- }
+ if (stopped()) {
+ break;
+ }
+ if (stage.type() == StagePB::EXTERNAL) {
+ continue;
+ }
+ int idx = stoi(stage.obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ LOG(WARNING) << "invalid idx: " << idx << ", id: " <<
stage.obj_info().id();
+ continue;
+ }
- if (s3_conf->prefix.find("/stage/") == std::string::npos) {
- LOG(WARNING) << "try to delete illegal prefix, which is
catastrophic, " << ss.str();
- ret = -1;
- continue;
- }
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init s3_conf with obj_info="
+ << old_obj.ShortDebugString();
+ continue;
+ }
- LOG(INFO) << "recycle expired stage objects, " << ss.str();
- int64_t expiration_time =
-
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
- config::internal_stage_objects_expire_time_second;
- if (config::force_immediate_recycle) {
- expiration_time = INT64_MAX;
- }
- ret1 = accessor->delete_all(expiration_time);
- if (ret1 != 0) {
- LOG(WARNING) << "failed to recycle expired stage objects, ret=" <<
ret1 << " "
- << ss.str();
- ret = -1;
- continue;
+ s3_conf->prefix = stage.obj_info().prefix();
+ std::shared_ptr<S3Accessor> accessor;
+ int ret1 = S3Accessor::create(*s3_conf, &accessor);
+ if (ret1 != 0) {
+ LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << "
" << ss.str();
+ ret = -1;
+ continue;
+ }
+
+ if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+ LOG(WARNING) << "try to delete illegal prefix, which is
catastrophic, " << ss.str();
+ ret = -1;
+ continue;
+ }
+
+ LOG(INFO) << "recycle expired stage objects, " << ss.str();
+ int64_t expiration_time =
+
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
+ config::internal_stage_objects_expire_time_second;
+ if (config::force_immediate_recycle) {
+ expiration_time = INT64_MAX;
+ }
+ ret1 = accessor->delete_all(expiration_time);
+ if (ret1 != 0) {
+ LOG(WARNING) << "failed to recycle expired stage objects,
ret=" << ret1 << " "
+ << ss.str();
+ ret = -1;
+ continue;
+ }
+ metrics_context.total_recycled_num++;
+ metrics_context.report();
}
- }
+ };
+
+ // for calculate the total num or bytes of recyled objects
+ scan_and_statistics();
+
+ // report to bvar
+ metrics_context.report(true);
+
+ handle_recycle_func();
return ret;
}
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 84e4075e61b..d1ae8a056c8 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -21,13 +21,16 @@
#include <atomic>
#include <condition_variable>
+#include <cstdint>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <thread>
+#include <utility>
+#include "common/bvars.h"
#include "meta-service/txn_lazy_committer.h"
#include "recycler/storage_vault_accessor.h"
#include "recycler/white_black_list.h"
@@ -42,6 +45,7 @@ class InstanceRecycler;
class StorageVaultAccessor;
class Checker;
class SimpleThreadPool;
+class RecyclerMetricsContext;
struct RecyclerThreadPoolGroup {
RecyclerThreadPoolGroup() = default;
RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
@@ -165,15 +169,15 @@ public:
* @param is_empty_tablet indicates whether the tablet has object files,
can skip delete objects if tablet is empty
* @return 0 for success otherwise error
*/
- int recycle_tablets(int64_t table_id, int64_t index_id, int64_t
partition_id = -1,
- bool is_empty_tablet = false);
+ int recycle_tablets(int64_t table_id, int64_t index_id,
RecyclerMetricsContext& ctx,
+ int64_t partition_id = -1, bool is_empty_tablet =
false);
/**
* recycle all rowsets belonging to the tablet specified by `tablet_id`
*
* @return 0 for success otherwise error
*/
- int recycle_tablet(int64_t tablet_id);
+ int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext&
metrics_context);
// scan and recycle useless partition version kv
int recycle_versions();
@@ -218,6 +222,13 @@ private:
std::function<int(std::string_view k,
std::string_view v)> recycle_func,
std::function<int()> loop_done = nullptr);
+ int scan_for_recycle_and_statistics(
+ std::string begin, std::string_view end, std::string task_name,
+ RecyclerMetricsContext& metrics_context,
+ std::function<int(std::string_view k, std::string_view v)>
statistics_func,
+ std::function<int(std::string_view k, std::string_view v)>
recycle_func,
+ std::function<int()> loop_done = nullptr);
+
// return 0 for success otherwise error
int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
@@ -227,8 +238,8 @@ private:
const std::string& rowset_id);
// return 0 for success otherwise error
- int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>&
rowsets,
- RowsetRecyclingState type);
+ int delete_rowset_data(const std::map<std::string,
doris::RowsetMetaCloudPB>& rowsets,
+ RowsetRecyclingState type, RecyclerMetricsContext&
metrics_context);
/**
* Get stage storage info from instance and init StorageVaultAccessor
@@ -241,6 +252,14 @@ private:
void unregister_recycle_task(const std::string& task_name);
+ // for scan all tablets and statistics metrics
+ int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
+ RecyclerMetricsContext& metrics_context,
+ int64_t partition_id = -1, bool
is_empty_tablet = false);
+
+ // for scan all rs of tablet and statistics metrics
+ int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext&
metrics_context);
+
private:
std::atomic_bool stopped_ {false};
std::shared_ptr<TxnKv> txn_kv_;
@@ -268,4 +287,109 @@ private:
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
};
+class RecyclerMetricsContext {
+public:
+ RecyclerMetricsContext() = default;
+
+ RecyclerMetricsContext(std::string instance_id, std::string operation_type)
+ : operation_type(std::move(operation_type)),
instance_id(std::move(instance_id)) {
+ start();
+ }
+
+ ~RecyclerMetricsContext() = default;
+
+ int64_t total_need_recycle_data_size = 0;
+ int64_t total_need_recycle_num = 0;
+
+ std::atomic_long total_recycled_data_size {0};
+ std::atomic_long total_recycled_num {0};
+
+ std::string operation_type {};
+ std::string instance_id {};
+
+ double start_time = 0;
+
+ void start() {
+ start_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
+
+ double duration() const {
+ return duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count() -
+ start_time;
+ }
+
+ void reset() {
+ total_need_recycle_data_size = 0;
+ total_need_recycle_num = 0;
+ total_recycled_data_size.store(0);
+ total_recycled_num.store(0);
+ start_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
+
+ void finish_report() {
+ if (!operation_type.empty()) {
+ double cost = duration();
+ g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
+ {instance_id, operation_type}, cost);
+ g_bvar_recycler_instance_recycle_round.put({instance_id,
operation_type}, 1);
+ LOG(INFO) << "recycle instance: " << instance_id
+ << ", operation type: " << operation_type << ", cost: "
<< cost
+ << " ms, total recycled num: " <<
total_recycled_num.load()
+ << ", total recycled data size: " <<
total_recycled_data_size.load()
+ << " bytes";
+ if (total_recycled_num.load()) {
+ g_bvar_recycler_instance_recycle_time_per_resource.put(
+ {instance_id, operation_type}, cost /
total_recycled_num.load());
+ } else {
+ g_bvar_recycler_instance_recycle_time_per_resource.put(
+ {instance_id, operation_type}, -1);
+ }
+ if (total_recycled_data_size.load()) {
+ g_bvar_recycler_instance_recycle_bytes_per_ms.put(
+ {instance_id, operation_type},
total_recycled_data_size.load() / cost);
+ } else {
+
g_bvar_recycler_instance_recycle_bytes_per_ms.put({instance_id, operation_type},
+ -1);
+ }
+ }
+ }
+
+ // `is_begin` is used to initialize total num of items need to be recycled
+ void report(bool is_begin = false) {
+ if (!operation_type.empty()) {
+ if (total_need_recycle_data_size > 0) {
+ // is init
+ if (is_begin) {
+ g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
+ {instance_id, operation_type},
total_need_recycle_data_size);
+ } else {
+ g_bvar_recycler_instance_last_round_recycled_bytes.put(
+ {instance_id, operation_type},
total_recycled_data_size.load());
+
g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
+ {instance_id, operation_type},
total_recycled_data_size.load());
+ }
+ }
+
+ if (total_need_recycle_num > 0) {
+ // is init
+ if (is_begin) {
+ g_bvar_recycler_instance_last_round_to_recycle_num.put(
+ {instance_id, operation_type},
total_need_recycle_num);
+ } else {
+ g_bvar_recycler_instance_last_round_recycled_num.put(
+ {instance_id, operation_type},
total_recycled_num.load());
+
g_bvar_recycler_instance_recycle_total_num_since_started.put(
+ {instance_id, operation_type},
total_recycled_num.load());
+ }
+ }
+ }
+ }
+};
+
} // namespace doris::cloud
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index b7ed057b6b5..ff1b33bf9f1 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -54,6 +54,7 @@ using namespace doris;
static const std::string instance_id = "instance_id_recycle_test";
static int64_t current_time = 0;
static constexpr int64_t db_id = 1000;
+static RecyclerMetricsContext ctx;
static doris::cloud::RecyclerThreadPoolGroup thread_group;
@@ -1127,7 +1128,7 @@ TEST(RecyclerTest, recycle_tablet) {
ASSERT_EQ(create_partition_version_kv(txn_kv.get(), table_id,
partition_id), 0);
- ASSERT_EQ(0, recycler.recycle_tablets(table_id, index_id));
+ ASSERT_EQ(0, recycler.recycle_tablets(table_id, index_id, ctx));
// check rowset does not exist on s3
std::unique_ptr<ListIterator> list_iter;
@@ -3663,16 +3664,17 @@ TEST(RecyclerTest, delete_rowset_data) {
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
constexpr int index_id = 10001, tablet_id = 10002;
- std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
for (int i = 0; i < 10; ++i) {
auto rowset = create_rowset(resource_id, tablet_id, index_id, 5,
schemas[i % 5]);
create_recycle_rowset(
txn_kv.get(), accessor.get(), rowset,
static_cast<RecycleRowsetPB::Type>(i %
(RecycleRowsetPB::Type_MAX + 1)), true);
- rowset_pbs.emplace_back(std::move(rowset));
+ rowset_pbs.emplace(rowset.rowset_id_v2(), std::move(rowset));
}
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET));
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET,
+ ctx));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
@@ -3768,16 +3770,17 @@ TEST(RecyclerTest,
delete_rowset_data_without_inverted_index_storage_format) {
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
constexpr int index_id = 10001, tablet_id = 10002;
- std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
for (int i = 0; i < 10; ++i) {
auto rowset = create_rowset(resource_id, tablet_id, index_id, 5,
schemas[i % 5]);
create_recycle_rowset(
txn_kv.get(), accessor.get(), rowset,
static_cast<RecycleRowsetPB::Type>(i %
(RecycleRowsetPB::Type_MAX + 1)), true);
- rowset_pbs.emplace_back(std::move(rowset));
+ rowset_pbs.emplace(rowset.rowset_id_v2(), std::move(rowset));
}
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET));
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET,
+ ctx));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
@@ -3934,7 +3937,7 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
// recycle tablet will fail because unuseful obj accessor can not
connectted
- EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
// however, useful mock accessor can recycle tablet
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
1);
}
@@ -4016,7 +4019,7 @@ TEST(RecyclerTest, recycle_tablet_without_resource_id) {
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
// recycle tablet will fail because unuseful obj accessor can not
connectted
- EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
// no resource id, cannot recycle
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
}
@@ -4098,7 +4101,7 @@ TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id)
{
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
// recycle tablet will fail because unuseful obj accessor can not
connectted
- EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
// no resource id, cannot recycle
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
}
@@ -4321,7 +4324,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
- std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("1");
@@ -4333,7 +4336,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1);
rowset.clear_tablet_schema();
- rowset_pbs.emplace_back(rowset);
+ rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
@@ -4348,7 +4351,8 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/1_0_1.idx"));
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET));
+ ASSERT_EQ(0,
+ recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET, ctx));
list_files.clear();
iter.reset();
EXPECT_EQ(accessor->list_all(&iter), 0);
@@ -4400,7 +4404,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
- std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("1");
@@ -4412,7 +4416,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
rowset.clear_tablet_schema();
- rowset_pbs.emplace_back(rowset);
+ rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
@@ -4427,7 +4431,8 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/1_0.idx"));
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET));
+ ASSERT_EQ(0,
+ recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET, ctx));
list_files.clear();
iter.reset();
EXPECT_EQ(accessor->list_all(&iter), 0);
@@ -4484,7 +4489,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id)
{
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
- std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("1");
@@ -4496,7 +4501,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id)
{
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
rowset.clear_tablet_schema();
- rowset_pbs.emplace_back(rowset);
+ rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("2");
@@ -4508,7 +4513,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id)
{
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
rowset.clear_tablet_schema();
- rowset_pbs.emplace_back(rowset);
+ rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
@@ -4525,7 +4530,8 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id)
{
EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
- EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET));
+ EXPECT_EQ(-1,
+ recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET, ctx));
list_files.clear();
iter.reset();
EXPECT_EQ(accessor->list_all(&iter), 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]