This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2e09935ce16 branch-3.1: [chore](recycler) Improve recycler metrics
#55455 (#55479)
2e09935ce16 is described below
commit 2e09935ce165554e797222c68f86d70d47fac45b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 4 09:59:59 2025 +0800
branch-3.1: [chore](recycler) Improve recycler metrics #55455 (#55479)
Cherry-picked from #55455
Co-authored-by: Uniqueyou <[email protected]>
---
cloud/src/common/bvars.cpp | 12 +-
cloud/src/common/bvars.h | 6 +-
cloud/src/recycler/recycler.cpp | 228 +++++++++++++++-----------------
cloud/src/recycler/recycler.h | 216 +++++++++++++++---------------
cloud/src/recycler/recycler_service.cpp | 65 +++++----
5 files changed, 247 insertions(+), 280 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index eb8618d4048..e495b5ea95b 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -107,11 +107,9 @@ BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycl
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler",
"recycle_expired_txn_label_earlest_ts");
BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_restore_job_earlest_ts("recycler",
"recycle_restore_job_earlest_ts");
bvar::Status<int64_t>
g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0);
-// current concurrency of recycle task
-bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
-
+// current status of recycle task (submitted, completed, error)
+mBvarIntAdder
g_bvar_recycler_instance_recycle_task_status("recycler_instance_recycle_task_status",
{ "status"});
// recycler's mbvars
-bvar::Adder<int64_t>
g_bvar_recycler_instance_running_counter("recycler_instance_running_counter");
// cost time of the last whole recycle process
mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycle_duration("recycler_instance_last_round_recycle_duration",{"instance_id"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"});
@@ -122,12 +120,10 @@ mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_last_success_ts("recycler_
// recycler's mbvars
// instance_id: unique identifier for the instance
-// resource_type: type of resource need to be recycled (index, partition,
rowset, segment, tablet, etc.)
// resource_id: unique identifier for the repository
-// status: status of the recycle task (normal, abnormal, etc.)
-mBvarIntAdder
g_bvar_recycler_vault_recycle_status("recycler_vault_recycle_status",
{"instance_id", "resource_id", "status"});
+// status: status of the recycle task (submitted, completed, error)
+mBvarIntAdder
g_bvar_recycler_vault_recycle_task_status("recycler_vault_recycle_task_status",
{"instance_id", "resource_id", "status"});
// current concurrency of vault delete task
-mBvarIntAdder
g_bvar_recycler_vault_recycle_task_concurrency("recycler_vault_recycle_task_concurrency",
{"instance_id", "resource_type", "resource_id"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycled_num("recycler_instance_last_round_recycled_num",
{"instance_id", "resource_type"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_to_recycle_num("recycler_instance_last_round_to_recycle_num",
{"instance_id", "resource_type"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycled_bytes("recycler_instance_last_round_recycled_bytes",
{"instance_id", "resource_type"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 9d7dfa232f0..740e322b900 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -267,16 +267,14 @@ extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_restore_job_earlest_ts
// recycler's mbvars
extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
-extern bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
-extern bvar::Adder<int64_t> g_bvar_recycler_instance_running_counter;
+extern mBvarIntAdder g_bvar_recycler_instance_recycle_task_status;
extern mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycle_duration;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_next_ts;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_start_ts;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_end_ts;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_last_success_ts;
-extern mBvarIntAdder g_bvar_recycler_vault_recycle_status;
-extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_concurrency;
+extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_status;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_num;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_to_recycle_num;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_bytes;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index fab7de75c2a..8e19c9ccd19 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -69,9 +69,6 @@ namespace doris::cloud {
using namespace std::chrono;
-RecyclerMetricsContext tablet_metrics_context_("global_recycler",
"recycle_tablet");
-RecyclerMetricsContext segment_metrics_context_("global_recycler",
"recycle_segment");
-
// return 0 for success get a key, 1 for key not found, negative for error
[[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key,
std::string& val) {
std::unique_ptr<Transaction> txn;
@@ -292,21 +289,18 @@ void Recycler::recycle_callback() {
if (stopped()) return;
LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- g_bvar_recycler_instance_recycle_task_concurrency << 1;
- g_bvar_recycler_instance_running_counter << 1;
g_bvar_recycler_instance_recycle_start_ts.put({instance_id}, ctime_ms);
- tablet_metrics_context_.reset();
- segment_metrics_context_.reset();
+ g_bvar_recycler_instance_recycle_task_status.put({"submitted"}, 1);
ret = instance_recycler->do_recycle();
- tablet_metrics_context_.finish_report();
- segment_metrics_context_.finish_report();
- g_bvar_recycler_instance_recycle_task_concurrency << -1;
- g_bvar_recycler_instance_running_counter << -1;
// If instance recycler has been aborted, don't finish this job
+
if (!instance_recycler->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), recycle_job_key,
instance_id, ip_port_,
ret == 0, ctime_ms);
}
+ if (instance_recycler->stopped() || ret != 0) {
+ g_bvar_recycler_instance_recycle_task_status.put({"error"}, 1);
+ }
{
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
@@ -317,6 +311,7 @@ void Recycler::recycle_callback() {
g_bvar_recycler_instance_last_round_recycle_duration.put({instance_id},
elpased_ms);
g_bvar_recycler_instance_next_ts.put({instance_id},
now +
config::recycle_interval_seconds * 1000);
+ g_bvar_recycler_instance_recycle_task_status.put({"completed"}, 1);
LOG(INFO) << "recycle instance done, "
<< "instance_id=" << instance_id << " ret=" << ret << "
ctime_ms: " << ctime_ms
<< " now: " << now;
@@ -673,6 +668,12 @@ auto task_wrapper(Func... funcs) -> std::function<int()> {
int InstanceRecycler::do_recycle() {
TEST_SYNC_POINT("InstanceRecycler.do_recycle");
+ tablet_metrics_context_.reset();
+ segment_metrics_context_.reset();
+ DORIS_CLOUD_DEFER {
+ tablet_metrics_context_.finish_report();
+ segment_metrics_context_.finish_report();
+ };
if (instance_info_.status() == InstanceInfoPB::DELETED) {
return recycle_deleted_instance();
} else if (instance_info_.status() == InstanceInfoPB::NORMAL) {
@@ -1889,31 +1890,31 @@ int InstanceRecycler::delete_rowset_data(
//020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.idx
std::set<std::string> deleted_rowset_id;
- std::for_each(
- paths->begin(), paths->end(),
- [&metrics_context, &rowsets, &deleted_rowset_id](const
std::string& path) {
- std::vector<std::string> str;
- butil::SplitString(path, '/', &str);
- std::string rowset_id;
- if (auto pos = str.back().find('_'); pos !=
std::string::npos) {
- rowset_id = str.back().substr(0, pos);
- } else {
- LOG(WARNING) << "failed to parse rowset_id,
path=" << path;
- return;
- }
- auto rs_meta = rowsets.find(rowset_id);
- if (rs_meta != rowsets.end() &&
- !deleted_rowset_id.contains(rowset_id)) {
- deleted_rowset_id.emplace(rowset_id);
- metrics_context.total_recycled_data_size +=
- rs_meta->second.total_disk_size();
- segment_metrics_context_.total_recycled_num +=
- rs_meta->second.num_segments();
-
segment_metrics_context_.total_recycled_data_size +=
- rs_meta->second.total_disk_size();
- metrics_context.total_recycled_num++;
- }
- });
+ std::for_each(paths->begin(), paths->end(),
+ [&metrics_context, &rowsets, &deleted_rowset_id,
+ this](const std::string& path) {
+ std::vector<std::string> str;
+ butil::SplitString(path, '/', &str);
+ std::string rowset_id;
+ if (auto pos = str.back().find('_'); pos !=
std::string::npos) {
+ rowset_id = str.back().substr(0, pos);
+ } else {
+ LOG(WARNING) << "failed to parse
rowset_id, path=" << path;
+ return;
+ }
+ auto rs_meta = rowsets.find(rowset_id);
+ if (rs_meta != rowsets.end() &&
+ !deleted_rowset_id.contains(rowset_id)) {
+ deleted_rowset_id.emplace(rowset_id);
+ metrics_context.total_recycled_data_size
+=
+
rs_meta->second.total_disk_size();
+
segment_metrics_context_.total_recycled_num +=
+ rs_meta->second.num_segments();
+
segment_metrics_context_.total_recycled_data_size +=
+
rs_meta->second.total_disk_size();
+ metrics_context.total_recycled_num++;
+ }
+ });
segment_metrics_context_.report();
metrics_context.report();
}
@@ -2000,12 +2001,10 @@ int
InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t inde
}
return 0;
};
- return scan_and_recycle(tablet_key_begin, tablet_key_end,
std::move(scan_and_statistics),
- [&metrics_context]() -> int {
- metrics_context.report();
- tablet_metrics_context_.report();
- return 0;
- });
+ int ret = scan_and_recycle(tablet_key_begin, tablet_key_end,
std::move(scan_and_statistics));
+ metrics_context.report(true);
+ tablet_metrics_context_.report(true);
+ return ret;
}
int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id,
@@ -2268,50 +2267,46 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
.tag("instance id", instance_id_)
.tag("tablet id", tablet_id)
.tag("recycle tablet resource ids are",
- std::accumulate(resource_ids.begin(), resource_ids.begin(),
std::string(),
+ std::accumulate(resource_ids.begin(), resource_ids.end(),
std::string(),
[](std::string rs_id, const auto& it) {
return rs_id.empty() ? it : rs_id + ", "
+ it;
}));
- SyncExecutor<int> concurrent_delete_executor(
+ SyncExecutor<std::pair<int, std::string>> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
fmt::format("delete tablet {} s3 rowset", tablet_id),
- [](const int& ret) { return ret != 0; });
+ [](const std::pair<int, std::string>& ret) { return ret.first !=
0; });
// delete all rowset data in this tablet
// ATTN: there may be data leak if not all accessor initilized successfully
// partial data deleted if the tablet is stored cross-storage vault
// vault id is not attached to TabletMeta...
for (const auto& resource_id : resource_ids) {
- concurrent_delete_executor.add([&, rs_id = resource_id,
- accessor_ptr =
accessor_map_[resource_id]]() {
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
- g_bvar_recycler_vault_recycle_task_concurrency.put(
- {instance_id_, metrics_context.operation_type, rs_id},
-1);
- metrics_context.report();
- });
- g_bvar_recycler_vault_recycle_task_concurrency.put(
- {instance_id_, metrics_context.operation_type, rs_id}, 1);
- int res =
accessor_ptr->delete_directory(tablet_path_prefix(tablet_id));
- if (res != 0) {
- LOG(WARNING) << "failed to delete rowset data of tablet " <<
tablet_id
- << " path=" << accessor_ptr->uri();
- g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id,
"abnormal"}, 1);
- return -1;
- }
- g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id,
"normal"}, 1);
- return 0;
- });
+ g_bvar_recycler_vault_recycle_task_status.put({instance_id_,
resource_id, "submitted"}, 1);
+ concurrent_delete_executor.add(
+ [&, rs_id = resource_id,
+ accessor_ptr = accessor_map_[resource_id]]() ->
decltype(auto) {
+ std::unique_ptr<int, std::function<void(int*)>> defer(
+ (int*)0x01, [&](int*) { metrics_context.report();
});
+ int res =
accessor_ptr->delete_directory(tablet_path_prefix(tablet_id));
+ if (res != 0) {
+ LOG(WARNING) << "failed to delete rowset data of
tablet " << tablet_id
+ << " path=" << accessor_ptr->uri();
+ return std::make_pair(-1, rs_id);
+ }
+ return std::make_pair(0, rs_id);
+ });
}
bool finished = true;
- std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
- for (int r : rets) {
- if (r != 0) {
+ std::vector<std::pair<int, std::string>> rets =
concurrent_delete_executor.when_all(&finished);
+ for (auto& r : rets) {
+ if (r.first != 0) {
+ g_bvar_recycler_vault_recycle_task_status.put({instance_id_,
r.second, "error"}, 1);
ret = -1;
}
+ g_bvar_recycler_vault_recycle_task_status.put({instance_id_, r.second,
"completed"}, 1);
}
-
ret = finished ? ret : -1;
if (ret != 0) { // failed recycle tablet data
@@ -3985,13 +3980,11 @@ int InstanceRecycler::scan_and_statistics_indexes() {
return 0;
};
- return scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- segment_metrics_context_.report(true);
- tablet_metrics_context_.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(index_key0, index_key1,
std::move(handle_index_kv));
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ tablet_metrics_context_.report(true);
+ return ret;
}
// Scan and statistics partitions that need to be recycled
@@ -4053,13 +4046,12 @@ int InstanceRecycler::scan_and_statistics_partitions() {
metrics_context.total_need_recycle_num++;
return ret;
};
- return scan_and_recycle(part_key0, part_key1,
std::move(handle_partition_kv),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- segment_metrics_context_.report(true);
- tablet_metrics_context_.report(true);
- return 0;
- });
+
+ int ret = scan_and_recycle(part_key0, part_key1,
std::move(handle_partition_kv));
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ tablet_metrics_context_.report(true);
+ return ret;
}
// Scan and statistics rowsets that need to be recycled
@@ -4101,15 +4093,13 @@ int InstanceRecycler::scan_and_statistics_rowsets() {
metrics_context.total_need_recycle_num++;
metrics_context.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
segment_metrics_context_.total_need_recycle_num +=
rowset_meta->num_segments();
- segment_metrics_context_.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
+ segment_metrics_context_.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
return 0;
};
- return scan_and_recycle(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- segment_metrics_context_.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv));
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ return ret;
}
// Scan and statistics tmp_rowsets that need to be recycled
@@ -4154,12 +4144,10 @@ int InstanceRecycler::scan_and_statistics_tmp_rowsets()
{
segment_metrics_context_.total_need_recycle_num +=
rowset.num_segments();
return 0;
};
- return scan_and_recycle(tmp_rs_key0, tmp_rs_key1,
std::move(handle_tmp_rowsets_kv),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- segment_metrics_context_.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(tmp_rs_key0, tmp_rs_key1,
std::move(handle_tmp_rowsets_kv));
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ return ret;
}
// Scan and statistics abort_timeout_txn that need to be recycled
@@ -4215,11 +4203,10 @@ int
InstanceRecycler::scan_and_statistics_abort_timeout_txn() {
}
return 0;
};
- return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
- std::move(handle_abort_timeout_txn_kv),
[&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+
+ int ret = scan_and_recycle(begin_txn_running_key, end_txn_running_key,
std::move(handle_abort_timeout_txn_kv));
+ metrics_context.report(true);
+ return ret;
}
// Scan and statistics expired_txn_label that need to be recycled
@@ -4250,11 +4237,10 @@ int
InstanceRecycler::scan_and_statistics_expired_txn_label() {
}
return 0;
};
- return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
- std::move(handle_expired_txn_label_kv),
[&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+
+ int ret = scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
std::move(handle_expired_txn_label_kv));
+ metrics_context.report(true);
+ return ret;
}
// Scan and statistics copy_jobs that need to be recycled
@@ -4304,11 +4290,9 @@ int InstanceRecycler::scan_and_statistics_copy_jobs() {
return 0;
};
- return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics));
+ metrics_context.report(true);
+ return ret;
}
// Scan and statistics stage that need to be recycled
@@ -4364,11 +4348,9 @@ int InstanceRecycler::scan_and_statistics_stage() {
return 0;
};
- return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics));
+ metrics_context.report(true);
+ return ret;
}
// Scan and statistics expired_stage_objects that need to be recycled
@@ -4456,11 +4438,9 @@ int InstanceRecycler::scan_and_statistics_versions() {
return 0;
};
- return scan_and_recycle(version_key_begin, version_key_end,
std::move(scan_and_statistics),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(version_key_begin, version_key_end,
std::move(scan_and_statistics));
+ metrics_context.report(true);
+ return ret;
}
// Scan and statistics restore jobs that need to be recycled
@@ -4492,11 +4472,9 @@ int InstanceRecycler::scan_and_statistics_restore_jobs()
{
return 0;
};
- return scan_and_recycle(restore_job_key0, restore_job_key1,
std::move(scan_and_statistics),
- [&metrics_context]() -> int {
- metrics_context.report(true);
- return 0;
- });
+ int ret = scan_and_recycle(restore_job_key0, restore_job_key1,
std::move(scan_and_statistics));
+ metrics_context.report(true);
+ return ret;
}
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 54c13432d3d..f204f016684 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -18,6 +18,7 @@
#pragma once
#include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
#include <atomic>
#include <condition_variable>
@@ -46,6 +47,8 @@ class StorageVaultAccessor;
class Checker;
class SimpleThreadPool;
class RecyclerMetricsContext;
+class TabletRecyclerMetricsContext;
+class SegmentRecyclerMetricsContext;
struct RecyclerThreadPoolGroup {
RecyclerThreadPoolGroup() = default;
RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
@@ -119,6 +122,109 @@ enum class RowsetRecyclingState {
TMP_ROWSET,
};
+class RecyclerMetricsContext {
+public:
+ RecyclerMetricsContext() = default;
+
+ RecyclerMetricsContext(std::string instance_id, std::string operation_type)
+ : operation_type(std::move(operation_type)),
instance_id(std::move(instance_id)) {
+ start();
+ }
+
+ ~RecyclerMetricsContext() = default;
+
+ std::atomic_ullong total_need_recycle_data_size = 0;
+ std::atomic_ullong total_need_recycle_num = 0;
+
+ std::atomic_ullong total_recycled_data_size = 0;
+ std::atomic_ullong total_recycled_num = 0;
+
+ std::string operation_type;
+ std::string instance_id;
+
+ double start_time = 0;
+
+ void start() {
+ start_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
+
+ double duration() const {
+ return duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count() -
+ start_time;
+ }
+
+ void reset() {
+ total_need_recycle_data_size = 0;
+ total_need_recycle_num = 0;
+ total_recycled_data_size = 0;
+ total_recycled_num = 0;
+ start_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
+
+ void finish_report() {
+ if (!operation_type.empty()) {
+ double cost = duration();
+ g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
+ {instance_id, operation_type}, cost);
+ g_bvar_recycler_instance_recycle_round.put({instance_id,
operation_type}, 1);
+ LOG(INFO) << "recycle instance: " << instance_id
+ << ", operation type: " << operation_type << ", cost: "
<< cost
+ << " ms, total recycled num: " <<
total_recycled_num.load()
+ << ", total recycled data size: " <<
total_recycled_data_size.load()
+ << " bytes";
+ if (cost != 0) {
+ if (total_recycled_num.load() != 0) {
+ g_bvar_recycler_instance_recycle_time_per_resource.put(
+ {instance_id, operation_type}, cost /
total_recycled_num.load());
+ }
+ g_bvar_recycler_instance_recycle_bytes_per_ms.put(
+ {instance_id, operation_type},
total_recycled_data_size.load() / cost);
+ }
+ }
+ }
+
+ // `is_begin` is used to initialize total num of items need to be recycled
+ void report(bool is_begin = false) {
+ if (!operation_type.empty()) {
+ // is init
+ if (is_begin) {
+ auto value = total_need_recycle_num.load();
+
+ g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
+ {instance_id, operation_type},
total_need_recycle_data_size.load());
+ g_bvar_recycler_instance_last_round_to_recycle_num.put(
+ {instance_id, operation_type}, value);
+ } else {
+ g_bvar_recycler_instance_last_round_recycled_bytes.put(
+ {instance_id, operation_type},
total_recycled_data_size.load());
+ g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
+ {instance_id, operation_type},
total_recycled_data_size.load());
+
g_bvar_recycler_instance_last_round_recycled_num.put({instance_id,
operation_type},
+
total_recycled_num.load());
+ g_bvar_recycler_instance_recycle_total_num_since_started.put(
+ {instance_id, operation_type},
total_recycled_num.load());
+ }
+ }
+ }
+};
+
+class TabletRecyclerMetricsContext : public RecyclerMetricsContext {
+public:
+ TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler",
"recycle_tablet") {}
+};
+
+class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
+public:
+ SegmentRecyclerMetricsContext()
+ : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
+};
+
class InstanceRecycler {
public:
explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance,
@@ -304,115 +410,9 @@ private:
RecyclerThreadPoolGroup _thread_pool_group;
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
-};
-class RecyclerMetricsContext {
-public:
- RecyclerMetricsContext() = default;
-
- RecyclerMetricsContext(std::string instance_id, std::string operation_type)
- : operation_type(std::move(operation_type)),
instance_id(std::move(instance_id)) {
- start();
- }
-
- ~RecyclerMetricsContext() = default;
-
- int64_t total_need_recycle_data_size = 0;
- int64_t total_need_recycle_num = 0;
-
- std::atomic_long total_recycled_data_size {0};
- std::atomic_long total_recycled_num {0};
-
- std::string operation_type {};
- std::string instance_id {};
-
- double start_time = 0;
-
- void start() {
- start_time = duration_cast<std::chrono::milliseconds>(
-
std::chrono::system_clock::now().time_since_epoch())
- .count();
- }
-
- double duration() const {
- return duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count() -
- start_time;
- }
-
- void reset() {
- total_need_recycle_data_size = 0;
- total_need_recycle_num = 0;
- total_recycled_data_size.store(0);
- total_recycled_num.store(0);
- start_time = duration_cast<std::chrono::milliseconds>(
-
std::chrono::system_clock::now().time_since_epoch())
- .count();
- }
-
- void finish_report() {
- if (!operation_type.empty()) {
- double cost = duration();
- g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
- {instance_id, operation_type}, cost);
- g_bvar_recycler_instance_recycle_round.put({instance_id,
operation_type}, 1);
- LOG(INFO) << "recycle instance: " << instance_id
- << ", operation type: " << operation_type << ", cost: "
<< cost
- << " ms, total recycled num: " <<
total_recycled_num.load()
- << ", total recycled data size: " <<
total_recycled_data_size.load()
- << " bytes";
- if (total_recycled_num.load()) {
- g_bvar_recycler_instance_recycle_time_per_resource.put(
- {instance_id, operation_type}, cost /
total_recycled_num.load());
- } else {
- g_bvar_recycler_instance_recycle_time_per_resource.put(
- {instance_id, operation_type}, -1);
- }
- if (total_recycled_data_size.load()) {
- g_bvar_recycler_instance_recycle_bytes_per_ms.put(
- {instance_id, operation_type},
total_recycled_data_size.load() / cost);
- } else {
-
g_bvar_recycler_instance_recycle_bytes_per_ms.put({instance_id, operation_type},
- -1);
- }
- }
- }
-
- // `is_begin` is used to initialize total num of items need to be recycled
- void report(bool is_begin = false) {
- if (!operation_type.empty()) {
- // is init
- if (is_begin) {
- if (total_need_recycle_data_size) {
- g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
- {instance_id, operation_type},
total_need_recycle_data_size);
- }
- } else {
- if (total_recycled_data_size.load()) {
- g_bvar_recycler_instance_last_round_recycled_bytes.put(
- {instance_id, operation_type},
total_recycled_data_size.load());
- }
- g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
- {instance_id, operation_type},
total_recycled_data_size.load());
- }
-
- // is init
- if (is_begin) {
- if (total_need_recycle_num) {
- g_bvar_recycler_instance_last_round_to_recycle_num.put(
- {instance_id, operation_type},
total_need_recycle_num);
- }
- } else {
- if (total_recycled_num.load()) {
- g_bvar_recycler_instance_last_round_recycled_num.put(
- {instance_id, operation_type},
total_recycled_num.load());
- }
- g_bvar_recycler_instance_recycle_total_num_since_started.put(
- {instance_id, operation_type},
total_recycled_num.load());
- }
- }
- }
+ TabletRecyclerMetricsContext tablet_metrics_context_;
+ SegmentRecyclerMetricsContext segment_metrics_context_;
};
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_service.cpp
b/cloud/src/recycler/recycler_service.cpp
index 48b2fe370ef..b2cd9712a4f 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -107,27 +107,15 @@ void
RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta
[](InstanceRecycler& instance_recycler) {
instance_recycler.scan_and_statistics_stage();
}},
- {"recycle_expired_stage_objects",
- [](InstanceRecycler& instance_recycler) {
+ {"recycle_expired_stage_objects", [](InstanceRecycler&
instance_recycler) {
instance_recycler.scan_and_statistics_expired_stage_objects();
- }},
- {"recycle_tablet",
- [](InstanceRecycler& instance_recycler) {
- instance_recycler.scan_and_statistics_partitions();
- instance_recycler.scan_and_statistics_indexes();
- }},
- {"recycle_segment", [](InstanceRecycler& instance_recycler) {
- instance_recycler.scan_and_statistics_partitions();
- instance_recycler.scan_and_statistics_indexes();
- instance_recycler.scan_and_statistics_rowsets();
- instance_recycler.scan_and_statistics_tmp_rowsets();
}}};
std::set<std::string> resource_types;
for (const auto& resource_type : req.resource_type()) {
if (resource_type == "*") {
- std::for_each(resource_handlers.begin(), resource_handlers.end(),
- [&](const auto& it) {
resource_types.emplace(it.first); });
+ std::ranges::for_each(resource_handlers,
+ [&](const auto& it) {
resource_types.emplace(it.first); });
break;
} else {
if (!resource_handlers.contains(resource_type)) {
@@ -152,15 +140,14 @@ void
RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta
for (const auto& instance_id : req.instance_ids()) {
if (instance_id == "*") {
- std::for_each(instances.begin(), instances.end(), [&](const
InstanceInfoPB& instance) {
+ std::ranges::for_each(instances, [&](const InstanceInfoPB&
instance) {
instance_ids.emplace(instance.instance_id());
});
break;
} else {
- if (std::find_if(instances.begin(), instances.end(),
- [&](const InstanceInfoPB& instance) {
- return instance.instance_id() == instance_id;
- }) == instances.end()) {
+ if (std::ranges::find_if(instances, [&](const InstanceInfoPB&
instance) {
+ return instance.instance_id() == instance_id;
+ }) == instances.end()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid instance id: {}", instance_id);
LOG_WARNING(msg);
@@ -225,24 +212,32 @@ void
RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, Meta
worker_pool->stop();
std::stringstream ss;
- for_each(instance_ids.begin(), instance_ids.end(), [&](const std::string&
id) {
+ std::ranges::for_each(instance_ids, [&](const std::string& id) {
ss << "Instance ID: " << id << "\n";
ss << "----------------------------------------\n";
- for_each(resource_types.begin(), resource_types.end(), [&](const auto&
resource_type) {
- int64_t to_recycle_num = 0;
- int64_t to_recycle_bytes = 0;
- if (resource_type == "recycle_segment" || resource_type ==
"recycle_tablet") {
- to_recycle_num =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
- {"global_recycler", resource_type});
- to_recycle_bytes =
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
- {"global_recycler", resource_type});
- } else {
- to_recycle_num =
-
g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type});
- to_recycle_bytes =
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
- {id, resource_type});
- }
+ // tablet and segment statistics
+ int64_t tablet_num =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+ {"global_recycler", "recycle_tablet"});
+ int64_t tablet_bytes =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+ {"global_recycler", "recycle_tablet"});
+ int64_t segment_num =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+ {"global_recycler", "recycle_segment"});
+ int64_t segment_bytes =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+ {"global_recycler", "recycle_segment"});
+ // clang-format off
+ ss << "Global recycler: " << "tablet and segment" << "\n";
+ ss << " • Need to recycle tablet count: " << tablet_num << " items\n";
+ ss << " • Need to recycle tablet size: " << tablet_bytes << "
bytes\n";
+ ss << " • Need to recycle segment count: " << segment_num << "
items\n";
+ ss << " • Need to recycle segment size: " << segment_bytes << "
bytes\n";
+ // clang-format on
+
+ std::ranges::for_each(resource_types, [&](const auto& resource_type) {
+ int64_t to_recycle_num =
+
g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type});
+ int64_t to_recycle_bytes = to_recycle_bytes =
+
g_bvar_recycler_instance_last_round_to_recycle_bytes.get({id, resource_type});
ss << "Task Type: " << resource_type << "\n";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]