This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f0b8d2cb24e branch-3.1: [Feature](recycler) Add recycler metrics for 
instance layer #51856 #51448 (#52429)
f0b8d2cb24e is described below

commit f0b8d2cb24eae8b7549f9347bf96311bc7678a2e
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Jun 28 22:54:40 2025 +0800

    branch-3.1: [Feature](recycler) Add recycler metrics for instance layer 
#51856 #51448 (#52429)
    
    Cherry-picked #51856 #51448
---
 cloud/src/common/bvars.cpp      |  40 +-
 cloud/src/common/bvars.h        |  47 ++-
 cloud/src/common/config.h       |   4 +
 cloud/src/main.cpp              |   3 +-
 cloud/src/recycler/recycler.cpp | 901 ++++++++++++++++++++++++++++++++++------
 cloud/src/recycler/recycler.h   | 134 +++++-
 cloud/test/recycler_test.cpp    |  46 +-
 7 files changed, 996 insertions(+), 179 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 9fd777a4b98..d8d6d34127a 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -20,6 +20,7 @@
 #include <bvar/multi_dimension.h>
 #include <bvar/reducer.h>
 #include <bvar/status.h>
+#include <bvar/window.h>
 
 #include <cstdint>
 #include <stdexcept>
@@ -103,14 +104,41 @@ BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_rowset_earlest_ts("recycler",
 BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler", 
"recycle_tmp_rowset_earlest_ts");
 BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", 
"recycle_expired_txn_label_earlest_ts");
 bvar::Status<int64_t> 
g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0);
-bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
+// current concurrency of recycle task
+bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
 
 // recycler's mbvars
-mBvarIntAdder 
g_bvar_recycler_instance_running("recycler_instance_running",{"instance_id"});
-mBvarLongStatus 
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration_ms",{"instance_id"});
-mBvarLongStatus 
g_bvar_recycler_instance_next_time("recycler_instance_next_time_s",{"instance_id"});
-mBvarPairStatus<int64_t> 
g_bvar_recycler_instance_recycle_times("recycler_instance_recycle_times",{"instance_id"});
-mBvarLongStatus 
g_bvar_recycler_instance_recycle_last_success_times("recycler_instance_recycle_last_success_times",{"instance_id"});
+bvar::Adder<int64_t> 
g_bvar_recycler_instance_running_counter("recycler_instance_running_counter");
+// cost time of the last whole recycle process
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"});
+// start and end timestamps of the recycle process
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_st_ts("recycler_instance_recycle_st_ts",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_ed_ts("recycler_instance_recycle_ed_ts",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_last_success_ts("recycler_instance_recycle_last_success_ts",{"instance_id"});
+
+// recycler's mbvars
+// instance_id: unique identifier for the instance
+// resource_type: type of resource need to be recycled (index, partition, 
rowset, segment, tablet, etc.)
+// resource_id: unique identifier for the repository
+// status: status of the recycle task (normal, abnormal, etc.)
+mBvarIntAdder 
g_bvar_recycler_vault_recycle_status("recycler_vault_recycle_status", 
{"instance_id", "resource_id", "status"});
+// current concurrency of vault delete task
+mBvarIntAdder 
g_bvar_recycler_vault_recycle_task_concurrency("recycler_vault_recycle_task_concurrency",
 {"instance_id", "resource_type", "resource_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_recycled_num("recycler_instance_last_round_recycled_num",
 {"instance_id", "resource_type"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_to_recycle_num("recycler_instance_last_round_to_recycle_num",
 {"instance_id", "resource_type"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_recycled_bytes("recycler_instance_last_round_recycled_bytes",
 {"instance_id", "resource_type"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_to_recycle_bytes("recycler_instance_last_round_to_recycle_bytes",
 {"instance_id", "resource_type"});
+mBvarStatus<double> 
g_bvar_recycler_instance_last_round_recycle_elpased_ts("recycler_instance_last_round_recycle_elpased_ts",
 {"instance_id", "resource_type"});
+// total recycled num and bytes of resources since recycler started
+mBvarIntAdder 
g_bvar_recycler_instance_recycle_total_num_since_started("recycler_instance_recycle_total_num_since_started",
 {"instance_id", "resource_type"});
+mBvarIntAdder 
g_bvar_recycler_instance_recycle_total_bytes_since_started("recycler_instance_recycle_total_bytes_since_started",
 {"instance_id", "resource_type"});
+mBvarIntAdder 
g_bvar_recycler_instance_recycle_round("recycler_instance_recycle_round", 
{"instance_id", "resource_type"});
+// represents the ms required per resource to be recycled
+// value of -1 means no resource recycled
+mBvarStatus<double> 
g_bvar_recycler_instance_recycle_time_per_resource("recycler_instance_recycle_time_per_resource",
 {"instance_id", "resource_type"});
+// represents the bytes of resources that can be recycled per ms
+mBvarStatus<double> 
g_bvar_recycler_instance_recycle_bytes_per_ms("recycler_instance_recycle_bytes_per_ms",
 {"instance_id", "resource_type"});
 
 // txn_kv's bvars
 bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 12f3f2c3060..9f4f3d1fccb 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -24,6 +24,7 @@
 #include <bvar/reducer.h>
 
 #include <cstdint>
+#include <initializer_list>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -139,8 +140,7 @@ public:
         BvarType* stats = 
counter_.get_stats(std::list<std::string>(dim_values));
         if (stats) {
             if constexpr (std::is_same_v<BvarType, bvar::Status<double>> ||
-                          std::is_same_v<BvarType, bvar::Status<long>> ||
-                          is_pair_status<BvarType>::value) {
+                          std::is_same_v<BvarType, bvar::Status<long>>) {
                 stats->set_value(value);
             } else {
                 *stats << value;
@@ -160,8 +160,6 @@ private:
     template <typename T>
     struct is_valid_bvar_type : std::false_type {};
     template <typename T>
-    struct is_pair_status : std::false_type {};
-    template <typename T>
     struct is_valid_bvar_type<bvar::Adder<T>> : std::true_type {};
     template <>
     struct is_valid_bvar_type<bvar::IntRecorder> : std::true_type {};
@@ -169,8 +167,6 @@ private:
     struct is_valid_bvar_type<bvar::Maxer<T>> : std::true_type {};
     template <typename T>
     struct is_valid_bvar_type<bvar::Status<T>> : std::true_type {};
-    template <typename T>
-    struct is_pair_status<bvar::Status<std::pair<T, T>>> : std::true_type {};
     template <>
     struct is_valid_bvar_type<bvar::LatencyRecorder> : std::true_type {};
 
@@ -183,18 +179,8 @@ using mBvarIntRecorder = mBvarWrapper<bvar::IntRecorder>;
 using mBvarLatencyRecorder = mBvarWrapper<bvar::LatencyRecorder>;
 using mBvarIntMaxer = mBvarWrapper<bvar::Maxer<int>>;
 using mBvarDoubleMaxer = mBvarWrapper<bvar::Maxer<double>>;
-using mBvarLongStatus = mBvarWrapper<bvar::Status<long>>;
-using mBvarDoubleStatus = mBvarWrapper<bvar::Status<double>>;
-
-namespace std {
-template <typename T1, typename T2>
-inline std::ostream& operator<<(std::ostream& os, const std::pair<T1, T2>& p) {
-    return os << "{" << p.first << "," << p.second << "}";
-}
-} // namespace std
-
 template <typename T>
-using mBvarPairStatus = mBvarWrapper<bvar::Status<std::pair<T, T>>>;
+using mBvarStatus = mBvarWrapper<bvar::Status<T>>;
 
 // meta-service's bvars
 extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
@@ -270,13 +256,28 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_rowset_earlest_ts;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_expired_txn_label_earlest_ts;
 
+// recycler's mbvars
 extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
-extern bvar::Adder<int64_t> g_bvar_recycler_task_concurrency;
-extern mBvarIntAdder g_bvar_recycler_instance_running;
-extern mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration;
-extern mBvarLongStatus g_bvar_recycler_instance_next_time;
-extern mBvarPairStatus<int64_t> g_bvar_recycler_instance_recycle_times;
-extern mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times;
+extern bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
+extern bvar::Adder<int64_t> g_bvar_recycler_instance_running_counter;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_recycle_duration;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_next_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_st_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_ed_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_last_success_ts;
+
+extern mBvarIntAdder g_bvar_recycler_vault_recycle_status;
+extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_concurrency;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_to_recycle_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_round_recycled_bytes;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_to_recycle_bytes;
+extern mBvarStatus<double> 
g_bvar_recycler_instance_last_round_recycle_elpased_ts;
+extern mBvarIntAdder g_bvar_recycler_instance_recycle_total_num_since_started;
+extern mBvarIntAdder 
g_bvar_recycler_instance_recycle_total_bytes_since_started;
+extern mBvarIntAdder g_bvar_recycler_instance_recycle_round;
+extern mBvarStatus<double> g_bvar_recycler_instance_recycle_time_per_resource;
+extern mBvarStatus<double> g_bvar_recycler_instance_recycle_bytes_per_ms;
 
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index cc1094ee103..883af8f64da 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -47,6 +47,8 @@ CONF_Int64(fdb_txn_timeout_ms, "10000");
 CONF_Int64(brpc_max_body_size, "3147483648");
 CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
 
+CONF_String(bvar_max_dump_multi_dimension_metric_num, "5000");
+
 // logging
 CONF_String(log_dir, "./log/");
 CONF_String(log_level, "info"); // info warn error fatal
@@ -104,6 +106,8 @@ 
CONF_mInt64(delete_bitmap_storage_optimize_v2_check_skip_seconds, "300"); // 5mi
 CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
 // interval for check object
 CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours
+// enable recycler metrics statistics
+CONF_Bool(enable_recycler_metrics, "false");
 
 CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
 CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
diff --git a/cloud/src/main.cpp b/cloud/src/main.cpp
index 18cf98720e9..0aad97aab4d 100644
--- a/cloud/src/main.cpp
+++ b/cloud/src/main.cpp
@@ -236,7 +236,8 @@ int main(int argc, char** argv) {
         std::cout << "try to start meta_service, recycler" << std::endl;
     }
 
-    
google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number", 
"2000");
+    google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number",
+                                 
config::bvar_max_dump_multi_dimension_metric_num.c_str());
 
     brpc::Server server;
     brpc::FLAGS_max_body_size = config::brpc_max_body_size;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 9c81b0364fd..b7238efbdc6 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -20,10 +20,12 @@
 #include <brpc/builtin_service.pb.h>
 #include <brpc/server.h>
 #include <butil/endpoint.h>
+#include <butil/strings/string_split.h>
 #include <bvar/status.h>
 #include <gen_cpp/cloud.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 
+#include <algorithm>
 #include <atomic>
 #include <chrono>
 #include <cstddef>
@@ -65,6 +67,9 @@ namespace doris::cloud {
 
 using namespace std::chrono;
 
+static RecyclerMetricsContext tablet_metrics_context_("global_recycler", 
"recycle_tablet");
+static RecyclerMetricsContext segment_metrics_context_("global_recycler", 
"recycle_segment");
+
 // return 0 for success get a key, 1 for key not found, negative for error
 [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, 
std::string& val) {
     std::unique_ptr<Transaction> txn;
@@ -280,12 +285,16 @@ void Recycler::recycle_callback() {
         if (stopped()) return;
         LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
         auto ctime_ms = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
-        g_bvar_recycler_task_concurrency << 1;
-        g_bvar_recycler_instance_running.put({instance_id}, 1);
-        g_bvar_recycler_instance_recycle_times.put({instance_id}, 
std::make_pair(ctime_ms, -1));
+        g_bvar_recycler_instance_recycle_task_concurrency << 1;
+        g_bvar_recycler_instance_running_counter << 1;
+        g_bvar_recycler_instance_recycle_st_ts.put({instance_id}, ctime_ms);
+        tablet_metrics_context_.reset();
+        segment_metrics_context_.reset();
         ret = instance_recycler->do_recycle();
-        g_bvar_recycler_task_concurrency << -1;
-        g_bvar_recycler_instance_running.put({instance_id}, -1);
+        tablet_metrics_context_.finish_report();
+        segment_metrics_context_.finish_report();
+        g_bvar_recycler_instance_recycle_task_concurrency << -1;
+        g_bvar_recycler_instance_running_counter << -1;
         // If instance recycler has been aborted, don't finish this job
         if (!instance_recycler->stopped()) {
             finish_instance_recycle_job(txn_kv_.get(), recycle_job_key, 
instance_id, ip_port_,
@@ -297,15 +306,15 @@ void Recycler::recycle_callback() {
         }
         auto now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
         auto elpased_ms = now - ctime_ms;
-        g_bvar_recycler_instance_recycle_times.put({instance_id}, 
std::make_pair(ctime_ms, now));
+        g_bvar_recycler_instance_recycle_ed_ts.put({instance_id}, now);
         g_bvar_recycler_instance_last_recycle_duration.put({instance_id}, 
elpased_ms);
-        g_bvar_recycler_instance_next_time.put({instance_id},
-                                               now + 
config::recycle_interval_seconds * 1000);
+        g_bvar_recycler_instance_next_ts.put({instance_id},
+                                             now + 
config::recycle_interval_seconds * 1000);
         LOG(INFO) << "recycle instance done, "
                   << "instance_id=" << instance_id << " ret=" << ret << " 
ctime_ms: " << ctime_ms
                   << " now: " << now;
 
-        g_bvar_recycler_instance_recycle_last_success_times.put({instance_id}, 
now);
+        g_bvar_recycler_instance_recycle_last_success_ts.put({instance_id}, 
now);
 
         LOG_INFO("finish recycle instance")
                 .tag("instance_id", instance_id)
@@ -806,6 +815,7 @@ int InstanceRecycler::recycle_indexes() {
     int64_t num_scanned = 0;
     int64_t num_expired = 0;
     int64_t num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
     RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
@@ -823,7 +833,8 @@ int InstanceRecycler::recycle_indexes() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle indexes finished, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle indexes finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
@@ -906,19 +917,61 @@ int InstanceRecycler::recycle_indexes() {
                 return -1;
             }
         }
-        if (recycle_tablets(index_pb.table_id(), index_id) != 0) {
+        if (recycle_tablets(index_pb.table_id(), index_id, metrics_context) != 
0) {
             LOG_WARNING("failed to recycle tablets under index")
                     .tag("table_id", index_pb.table_id())
                     .tag("instance_id", instance_id_)
                     .tag("index_id", index_id);
             return -1;
         }
-        ++num_recycled;
+        metrics_context.total_recycled_num = ++num_recycled;
+        metrics_context.report();
         check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, 
start_time);
         index_keys.push_back(k);
         return 0;
     };
 
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
+        RecycleIndexPB index_pb;
+        if (!index_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time < calc_expiration(index_pb)) {
+            return 0;
+        }
+        // decode index_id
+        auto k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
+        auto index_id = std::get<int64_t>(std::get<0>(out[3]));
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string val;
+        err = txn->get(k, &val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        index_pb.Clear();
+        if (!index_pb.ParseFromString(val)) {
+            return 0;
+        }
+        if (scan_tablets_and_statistics(index_pb.table_id(), index_id, 
metrics_context) != 0) {
+            return 0;
+        }
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
     auto loop_done = [&index_keys, this]() -> int {
         if (index_keys.empty()) return 0;
         DORIS_CLOUD_DEFER {
@@ -931,7 +984,9 @@ int InstanceRecycler::recycle_indexes() {
         return 0;
     };
 
-    return scan_and_recycle(index_key0, index_key1, std::move(recycle_func), 
std::move(loop_done));
+    return scan_for_recycle_and_statistics(index_key0, index_key1, "indexes", 
metrics_context,
+                                           std::move(scan_and_statistics), 
std::move(recycle_func),
+                                           std::move(loop_done));
 }
 
 bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string 
instance_id,
@@ -1024,6 +1079,7 @@ int InstanceRecycler::recycle_partitions() {
     int64_t num_scanned = 0;
     int64_t num_expired = 0;
     int64_t num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
     RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
@@ -1041,7 +1097,8 @@ int InstanceRecycler::recycle_partitions() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle partitions finished, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle partitions finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
@@ -1131,7 +1188,8 @@ int InstanceRecycler::recycle_partitions() {
 
         int ret = 0;
         for (int64_t index_id : part_pb.index_id()) {
-            if (recycle_tablets(part_pb.table_id(), index_id, partition_id, 
is_empty_tablet) != 0) {
+            if (recycle_tablets(part_pb.table_id(), index_id, metrics_context, 
partition_id,
+                                is_empty_tablet) != 0) {
                 LOG_WARNING("failed to recycle tablets under partition")
                         .tag("table_id", part_pb.table_id())
                         .tag("instance_id", instance_id_)
@@ -1148,7 +1206,56 @@ int InstanceRecycler::recycle_partitions() {
                 partition_version_keys.push_back(partition_version_key(
                         {instance_id_, part_pb.db_id(), part_pb.table_id(), 
partition_id}));
             }
+            metrics_context.report();
+        }
+        return ret;
+    };
+
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&, this](std::string_view k, std::string_view 
v) -> int {
+        RecyclePartitionPB part_pb;
+        if (!part_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time < calc_expiration(part_pb)) {
+            return 0;
+        }
+        // decode partition_id
+        auto k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        // 0x01 "recycle" ${instance_id} "partition" ${partition_id} -> 
RecyclePartitionPB
+        auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
+        // Change state to RECYCLING
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string val;
+        err = txn->get(k, &val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        part_pb.Clear();
+        if (!part_pb.ParseFromString(val)) {
+            return 0;
+        }
+        // Partitions with PREPARED state MUST have no data
+        bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
+        int ret = 0;
+        for (int64_t index_id : part_pb.index_id()) {
+            if (scan_tablets_and_statistics(part_pb.table_id(), index_id, 
metrics_context,
+                                            partition_id, is_empty_tablet) != 
0) {
+                ret = 0;
+            }
         }
+        metrics_context.total_need_recycle_num++;
         return ret;
     };
 
@@ -1179,12 +1286,15 @@ int InstanceRecycler::recycle_partitions() {
         return 0;
     };
 
-    return scan_and_recycle(part_key0, part_key1, std::move(recycle_func), 
std::move(loop_done));
+    return scan_for_recycle_and_statistics(part_key0, part_key1, "partitions", 
metrics_context,
+                                           std::move(scan_and_statistics), 
std::move(recycle_func),
+                                           std::move(loop_done));
 }
 
 int InstanceRecycler::recycle_versions() {
     int64_t num_scanned = 0;
     int64_t num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_versions");
 
     LOG_INFO("begin to recycle table and partition 
versions").tag("instance_id", instance_id_);
 
@@ -1192,7 +1302,8 @@ int InstanceRecycler::recycle_versions() {
 
     DORIS_CLOUD_DEFER {
         auto cost = duration<float>(steady_clock::now() - start_time).count();
-        LOG_INFO("recycle table and partition versions finished, cost={}s", 
cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle table and partition versions finished, cost={}s", 
cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_recycled", num_recycled);
@@ -1202,8 +1313,8 @@ int InstanceRecycler::recycle_versions() {
     auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0, 
0});
     int64_t last_scanned_table_id = 0;
     bool is_recycled = false; // Is last scanned kv recycled
-    auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id, 
&is_recycled, this](
-                                std::string_view k, std::string_view) {
+    auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id, 
&is_recycled,
+                         &metrics_context, this](std::string_view k, 
std::string_view) {
         ++num_scanned;
         auto k1 = k;
         k1.remove_prefix(1);
@@ -1264,15 +1375,58 @@ int InstanceRecycler::recycle_versions() {
         if (err != TxnErrorCode::TXN_OK) {
             return -1;
         }
-        ++num_recycled;
+        metrics_context.total_recycled_num = ++num_recycled;
+        metrics_context.report();
         is_recycled = true;
         return 0;
     };
 
-    return scan_and_recycle(version_key_begin, version_key_end, 
std::move(recycle_func));
+    int64_t last_scanned_table_id_t = 0;
+    bool is_recycled_t = false; // Is last scanned kv recycled
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, &last_scanned_table_id_t, 
&is_recycled_t, this](
+                                       std::string_view k, std::string_view) {
+        auto k1 = k;
+        k1.remove_prefix(1);
+        // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} 
${partition_id}
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        DCHECK_EQ(out.size(), 6) << k;
+        auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+        if (table_id == last_scanned_table_id_t) { // Already handle kvs of 
this table
+            metrics_context.total_need_recycle_num +=
+                    is_recycled_t; // Version kv of this table has been 
recycled
+            return 0;
+        }
+        last_scanned_table_id_t = table_id;
+        is_recycled_t = false;
+        auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0, 
0, 0});
+        auto tablet_key_end = stats_tablet_key({instance_id_, table_id, 
INT64_MAX, 0, 0});
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::unique_ptr<RangeGetIterator> iter;
+        err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        if (iter->has_next()) { // Table is useful, should not recycle table 
and partition versions
+            return 0;
+        }
+        metrics_context.total_need_recycle_num++;
+        is_recycled_t = true;
+        return 0;
+    };
+
+    return scan_for_recycle_and_statistics(version_key_begin, version_key_end, 
"versions",
+                                           metrics_context, 
std::move(scan_and_statistics),
+                                           std::move(recycle_func));
 }
 
-int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, 
int64_t partition_id,
+int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
+                                      RecyclerMetricsContext& metrics_context, 
int64_t partition_id,
                                       bool is_empty_tablet) {
     int64_t num_scanned = 0;
     std::atomic_long num_recycled = 0;
@@ -1351,8 +1505,8 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_, 
tablet_id}));
         if (!is_empty_tablet) {
             sync_executor.add([this, &num_recycled, tid = tablet_id, 
range_move = use_range_remove,
-                               k]() mutable -> TabletKeyPair {
-                if (recycle_tablet(tid) != 0) {
+                               &metrics_context, k]() mutable -> TabletKeyPair 
{
+                if (recycle_tablet(tid, metrics_context) != 0) {
                     LOG_WARNING("failed to recycle tablet")
                             .tag("instance_id", instance_id_)
                             .tag("tablet_id", tid);
@@ -1536,18 +1690,20 @@ int InstanceRecycler::delete_rowset_data(const 
doris::RowsetMetaCloudPB& rs_meta
     return accessor->delete_files(file_paths);
 }
 
-int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaCloudPB>& rowsets,
-                                         RowsetRecyclingState type) {
+int InstanceRecycler::delete_rowset_data(
+        const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets, 
RowsetRecyclingState type,
+        RecyclerMetricsContext& metrics_context) {
     int ret = 0;
     // resource_id -> file_paths
     std::map<std::string, std::vector<std::string>> resource_file_paths;
     // (resource_id, tablet_id, rowset_id)
     std::vector<std::tuple<std::string, int64_t, std::string>> 
rowsets_delete_by_prefix;
+    bool is_formal_rowset = (type == RowsetRecyclingState::FORMAL_ROWSET);
 
-    for (const auto& rs : rowsets) {
+    for (const auto& [_, rs] : rowsets) {
         // we have to treat tmp rowset as "orphans" that may not related to 
any existing tablets
         // due to aborted schema change.
-        if (type == RowsetRecyclingState::FORMAL_ROWSET) {
+        if (is_formal_rowset) {
             std::lock_guard lock(recycled_tablets_mtx_);
             if (recycled_tablets_.count(rs.tablet_id())) {
                 continue; // Rowset data has already been deleted
@@ -1672,7 +1828,42 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
                 return -1;
             }
             auto& accessor = accessor_map_[*rid];
-            return accessor->delete_files(*paths);
+            int ret = accessor->delete_files(*paths);
+            if (!ret) {
+                // deduplication of different files with the same rowset id
+                // 020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.dat
+                //020000000000007fd045a62bc87a6587dd7ac274aa36e5a9_0.idx
+                std::set<std::string> deleted_rowset_id;
+
+                std::for_each(
+                        paths->begin(), paths->end(),
+                        [&metrics_context, &rowsets, &deleted_rowset_id](const 
std::string& path) {
+                            std::vector<std::string> str;
+                            butil::SplitString(path, '/', &str);
+                            std::string rowset_id;
+                            if (auto pos = str.back().find('_'); pos != 
std::string::npos) {
+                                rowset_id = str.back().substr(0, pos);
+                            } else {
+                                LOG(WARNING) << "failed to parse rowset_id, 
path=" << path;
+                                return;
+                            }
+                            auto rs_meta = rowsets.find(rowset_id);
+                            if (rs_meta != rowsets.end() &&
+                                !deleted_rowset_id.contains(rowset_id)) {
+                                deleted_rowset_id.emplace(rowset_id);
+                                metrics_context.total_recycled_data_size +=
+                                        rs_meta->second.total_disk_size();
+                                segment_metrics_context_.total_recycled_num +=
+                                        rs_meta->second.num_segments();
+                                
segment_metrics_context_.total_recycled_data_size +=
+                                        rs_meta->second.total_disk_size();
+                                metrics_context.total_recycled_num++;
+                            }
+                        });
+                segment_metrics_context_.report();
+                metrics_context.report();
+            }
+            return ret;
         });
     }
     for (const auto& [resource_id, tablet_id, rowset_id] : 
rowsets_delete_by_prefix) {
@@ -1680,9 +1871,21 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
                 "delete rowset {} by prefix because it's in 
BEGIN_PARTIAL_UPDATE state, "
                 "resource_id={}, tablet_id={}, instance_id={}",
                 rowset_id, resource_id, tablet_id, instance_id_);
-        concurrent_delete_executor.add(
-                [&]() -> int { return delete_rowset_data(resource_id, 
tablet_id, rowset_id); });
+        concurrent_delete_executor.add([&]() -> int {
+            int ret = delete_rowset_data(resource_id, tablet_id, rowset_id);
+            if (!ret) {
+                auto rs = rowsets.at(rowset_id);
+                metrics_context.total_recycled_data_size += 
rs.total_disk_size();
+                metrics_context.total_recycled_num++;
+                segment_metrics_context_.total_recycled_data_size += 
rs.total_disk_size();
+                segment_metrics_context_.total_recycled_num += 
rs.num_segments();
+                metrics_context.report();
+                segment_metrics_context_.report();
+            }
+            return ret;
+        });
     }
+
     bool finished = true;
     std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
     for (int r : rets) {
@@ -1710,7 +1913,122 @@ int InstanceRecycler::delete_rowset_data(const 
std::string& resource_id, int64_t
     return accessor->delete_prefix(rowset_path_prefix(tablet_id, rowset_id));
 }
 
-int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
+int InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t 
index_id,
+                                                  RecyclerMetricsContext& 
metrics_context,
+                                                  int64_t partition_id, bool 
is_empty_tablet) {
+    std::string tablet_key_begin, tablet_key_end;
+
+    if (partition_id > 0) {
+        meta_tablet_key({instance_id_, table_id, index_id, partition_id, 0}, 
&tablet_key_begin);
+        meta_tablet_key({instance_id_, table_id, index_id, partition_id + 1, 
0}, &tablet_key_end);
+    } else {
+        meta_tablet_key({instance_id_, table_id, index_id, 0, 0}, 
&tablet_key_begin);
+        meta_tablet_key({instance_id_, table_id, index_id + 1, 0, 0}, 
&tablet_key_end);
+    }
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&, is_empty_tablet, this](std::string_view k,
+                                                          std::string_view v) 
-> int {
+        doris::TabletMetaCloudPB tablet_meta_pb;
+        if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t tablet_id = tablet_meta_pb.tablet_id();
+
+        if (!check_lazy_txn_finished(txn_kv_, instance_id_, 
tablet_meta_pb.tablet_id())) {
+            return 0;
+        }
+
+        if (!is_empty_tablet) {
+            if (scan_tablet_and_statistics(tablet_id, metrics_context) != 0) {
+                return 0;
+            }
+            tablet_metrics_context_.total_need_recycle_num++;
+        }
+        return 0;
+    };
+    return scan_and_recycle(tablet_key_begin, tablet_key_end, 
std::move(scan_and_statistics));
+}
+
+int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id,
+                                                 RecyclerMetricsContext& 
metrics_context) {
+    int ret = 0;
+    std::map<std::string, RowsetMetaCloudPB> rowset_meta_map;
+    std::unique_ptr<Transaction> txn;
+    if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to recycle tablet ")
+                .tag("tablet id", tablet_id)
+                .tag("instance_id", instance_id_)
+                .tag("reason", "failed to create txn");
+        ret = -1;
+    }
+    GetRowsetResponse resp;
+    std::string msg;
+    MetaServiceCode code = MetaServiceCode::OK;
+    // get rowsets in tablet
+    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, 
instance_id_,
+                        tablet_id, code, msg, &resp);
+    if (code != MetaServiceCode::OK) {
+        LOG_WARNING("failed to get rowsets of tablet when recycle tablet")
+                .tag("tablet id", tablet_id)
+                .tag("msg", msg)
+                .tag("code", code)
+                .tag("instance id", instance_id_);
+        ret = -1;
+    }
+    for (const auto& rs_meta : resp.rowset_meta()) {
+        /*
+        * For compatibility, we skip the loop for [0-1] here. 
+        * The purpose of this loop is to delete object files,
+        * and since [0-1] only has meta and doesn't have object files, 
+        * skipping it doesn't affect system correctness. 
+        *
+        * If not skipped, the check "if (!rs_meta.has_resource_id())" below 
+        * would return error -1 directly, causing the recycle operation to 
fail.
+        *
+        * [0-1] doesn't have resource id is a bug.
+        * In the future, we will fix this problem, after that,
+        * we can remove this if statement.
+        *
+        * TODO(Yukang-Lian): remove this if statement when [0-1] has resource 
id in the future.
+        */
+
+        if (rs_meta.end_version() == 1) {
+            // Assert that [0-1] has no resource_id to make sure
+            // this if statement will not be forgetted to remove
+            // when the resource id bug is fixed
+            DCHECK(!rs_meta.has_resource_id()) << "rs_meta" << 
rs_meta.ShortDebugString();
+            continue;
+        }
+        if (!rs_meta.has_resource_id()) {
+            LOG_WARNING("rowset meta does not have a resource id, impossible!")
+                    .tag("rs_meta", rs_meta.ShortDebugString())
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id);
+            continue;
+        }
+        DCHECK(rs_meta.has_resource_id()) << "rs_meta" << 
rs_meta.ShortDebugString();
+        auto it = accessor_map_.find(rs_meta.resource_id());
+        // possible if the accessor is not initilized correctly
+        if (it == accessor_map_.end()) [[unlikely]] {
+            LOG_WARNING(
+                    "failed to find resource id when recycle tablet, skip this 
vault accessor "
+                    "recycle process")
+                    .tag("tablet id", tablet_id)
+                    .tag("instance_id", instance_id_)
+                    .tag("resource_id", rs_meta.resource_id())
+                    .tag("rowset meta pb", rs_meta.ShortDebugString());
+            continue;
+        }
+
+        metrics_context.total_need_recycle_data_size += 
rs_meta.total_disk_size();
+        tablet_metrics_context_.total_need_recycle_data_size += 
rs_meta.total_disk_size();
+        segment_metrics_context_.total_need_recycle_data_size += 
rs_meta.total_disk_size();
+        segment_metrics_context_.total_need_recycle_num += 
rs_meta.num_segments();
+    }
+    return ret;
+}
+
+int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext& metrics_context) {
     LOG_INFO("begin to recycle rowsets in a dropped tablet")
             .tag("instance_id", instance_id_)
             .tag("tablet_id", tablet_id);
@@ -1726,7 +2044,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
     std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, 
""});
     std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 
1, ""});
 
-    std::set<std::string> resource_ids;
+    std::vector<std::string> rowset_meta;
     int64_t recycle_rowsets_number = 0;
     int64_t recycle_segments_number = 0;
     int64_t recycle_rowsets_data_size = 0;
@@ -1832,16 +2150,17 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) 
{
         max_rowset_creation_time = std::max(max_rowset_creation_time, 
rs_meta.creation_time());
         min_rowset_expiration_time = std::min(min_rowset_expiration_time, 
rs_meta.txn_expiration());
         max_rowset_expiration_time = std::max(max_rowset_expiration_time, 
rs_meta.txn_expiration());
-        resource_ids.emplace(rs_meta.resource_id());
+        rowset_meta.emplace_back(rs_meta.resource_id());
+        LOG(INFO) << "rs_meta.resource_id()=" << rs_meta.resource_id();
     }
 
     LOG_INFO("recycle tablet start to delete object")
             .tag("instance id", instance_id_)
             .tag("tablet id", tablet_id)
             .tag("recycle tablet resource ids are",
-                 std::accumulate(resource_ids.begin(), resource_ids.end(), 
std::string(),
-                                 [](const std::string& a, const std::string& 
b) {
-                                     return a.empty() ? b : a + "," + b;
+                 std::accumulate(rowset_meta.begin(), rowset_meta.begin(), 
std::string(),
+                                 [](std::string acc, const auto& it) {
+                                     return acc.empty() ? it : acc + ", " + it;
                                  }));
 
     SyncExecutor<int> concurrent_delete_executor(
@@ -1853,13 +2172,24 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) 
{
     // ATTN: there may be data leak if not all accessor initilized successfully
     //       partial data deleted if the tablet is stored cross-storage vault
     //       vault id is not attached to TabletMeta...
-    for (const auto& resource_id : resource_ids) {
-        concurrent_delete_executor.add([&, accessor_ptr = 
accessor_map_[resource_id]]() {
-            if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)) 
!= 0) {
+    for (const auto& resource_id : rowset_meta) {
+        concurrent_delete_executor.add([&, rs_id = resource_id,
+                                        accessor_ptr = 
accessor_map_[resource_id]]() {
+            std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&](int*) {
+                g_bvar_recycler_vault_recycle_task_concurrency.put(
+                        {instance_id_, metrics_context.operation_type, rs_id}, 
-1);
+                metrics_context.report();
+            });
+            g_bvar_recycler_vault_recycle_task_concurrency.put(
+                    {instance_id_, metrics_context.operation_type, rs_id}, 1);
+            int res = 
accessor_ptr->delete_directory(tablet_path_prefix(tablet_id));
+            if (res != 0) {
                 LOG(WARNING) << "failed to delete rowset data of tablet " << 
tablet_id
                              << " path=" << accessor_ptr->uri();
+                g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id, 
"abnormal"}, 1);
                 return -1;
             }
+            g_bvar_recycler_vault_recycle_status.put({instance_id_, rs_id, 
"normal"}, 1);
             return 0;
         });
     }
@@ -1883,6 +2213,15 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
         return ret;
     }
 
+    tablet_metrics_context_.total_recycled_data_size +=
+            recycle_rowsets_data_size + recycle_rowsets_index_size;
+    tablet_metrics_context_.total_recycled_num += 1;
+    segment_metrics_context_.total_recycled_num += recycle_segments_number;
+    segment_metrics_context_.total_recycled_data_size +=
+            recycle_rowsets_data_size + recycle_rowsets_index_size;
+    tablet_metrics_context_.report();
+    segment_metrics_context_.report();
+
     txn.reset();
     if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
         LOG_WARNING("failed to recycle tablet ")
@@ -1928,6 +2267,7 @@ int InstanceRecycler::recycle_rowsets() {
     size_t total_rowset_value_size = 0;
     size_t expired_rowset_size = 0;
     std::atomic_long num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
     RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
@@ -1945,7 +2285,8 @@ int InstanceRecycler::recycle_rowsets() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle rowsets finished, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle rowsets finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
@@ -1959,7 +2300,9 @@ int InstanceRecycler::recycle_rowsets() {
     };
 
     std::vector<std::string> rowset_keys;
-    std::vector<doris::RowsetMetaCloudPB> rowsets;
+    // rowset_id -> rowset_meta
+    // store rowset id and meta for statistics rs size when delete
+    std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
 
     // Store keys of rowset recycled by background workers
     std::mutex async_recycled_rowset_keys_mutex;
@@ -2026,6 +2369,43 @@ int InstanceRecycler::recycle_rowsets() {
         return final_expiration;
     };
 
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
+        RecycleRowsetPB rowset;
+        if (!rowset.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time < calc_expiration(rowset)) { // not expired
+            return 0;
+        }
+        if (!rowset.has_type()) {
+            if (!rowset.has_resource_id()) [[unlikely]] {
+                return 0;
+            }
+            if (rowset.resource_id().empty()) [[unlikely]] {
+                return 0;
+            }
+            return 0;
+        }
+        auto* rowset_meta = rowset.mutable_rowset_meta();
+        if (!rowset_meta->has_resource_id()) [[unlikely]] {
+            if (rowset.type() == RecycleRowsetPB::PREPARE || 
rowset_meta->num_segments() != 0) {
+                return 0;
+            }
+        }
+        if (rowset.type() != RecycleRowsetPB::PREPARE) {
+            if (rowset_meta->num_segments() > 0) {
+                metrics_context.total_need_recycle_num++;
+                segment_metrics_context_.total_need_recycle_num += 
rowset_meta->num_segments();
+                segment_metrics_context_.total_need_recycle_data_size +=
+                        rowset_meta->total_disk_size();
+                metrics_context.total_need_recycle_data_size += 
rowset_meta->total_disk_size();
+            }
+        }
+        return 0;
+    };
+
     auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int 
{
         ++num_scanned;
         total_rowset_key_size += k.size();
@@ -2104,7 +2484,7 @@ int InstanceRecycler::recycle_rowsets() {
             num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
             rowset_keys.emplace_back(k);
             if (rowset_meta->num_segments() > 0) { // Skip empty rowset
-                rowsets.push_back(std::move(*rowset_meta));
+                rowsets.emplace(rowset_meta->rowset_id_v2(), 
std::move(*rowset_meta));
             } else {
                 ++num_empty_rowset;
             }
@@ -2114,12 +2494,15 @@ int InstanceRecycler::recycle_rowsets() {
 
     auto loop_done = [&]() -> int {
         std::vector<std::string> rowset_keys_to_delete;
-        std::vector<doris::RowsetMetaCloudPB> rowsets_to_delete;
+        // rowset_id -> rowset_meta
+        // store rowset id and meta for statistics rs size when delete
+        std::map<std::string, doris::RowsetMetaCloudPB> rowsets_to_delete;
         rowset_keys_to_delete.swap(rowset_keys);
         rowsets_to_delete.swap(rowsets);
         worker_pool->submit([&, rowset_keys_to_delete = 
std::move(rowset_keys_to_delete),
                              rowsets_to_delete = 
std::move(rowsets_to_delete)]() {
-            if (delete_rowset_data(rowsets_to_delete, 
RowsetRecyclingState::FORMAL_ROWSET) != 0) {
+            if (delete_rowset_data(rowsets_to_delete, 
RowsetRecyclingState::FORMAL_ROWSET,
+                                   metrics_context) != 0) {
                 LOG(WARNING) << "failed to delete rowset data, instance_id=" 
<< instance_id_;
                 return;
             }
@@ -2132,8 +2515,9 @@ int InstanceRecycler::recycle_rowsets() {
         return 0;
     };
 
-    int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, 
std::move(handle_rowset_kv),
-                               std::move(loop_done));
+    int ret = scan_for_recycle_and_statistics(recyc_rs_key0, recyc_rs_key1, 
"rowsets",
+                                              metrics_context, 
std::move(scan_and_statistics),
+                                              std::move(handle_rowset_kv), 
std::move(loop_done));
     worker_pool->stop();
 
     if (!async_recycled_rowset_keys.empty()) {
@@ -2239,6 +2623,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
     size_t expired_rowset_size = 0;
     size_t total_rowset_key_size = 0;
     size_t total_rowset_value_size = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
     MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
@@ -2256,7 +2641,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle tmp rowsets finished, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle tmp rowsets finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
@@ -2268,7 +2654,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
 
     // Elements in `tmp_rowset_keys` has the same lifetime as `it`
     std::vector<std::string_view> tmp_rowset_keys;
-    std::vector<doris::RowsetMetaCloudPB> tmp_rowsets;
+    // rowset_id -> rowset_meta
+    // store tmp_rowset id and meta for statistics rs size when delete
+    std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets;
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
     auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB& 
rowset) {
@@ -2342,17 +2730,54 @@ int InstanceRecycler::recycle_tmp_rowsets() {
 
         tmp_rowset_keys.push_back(k);
         if (rowset.num_segments() > 0) { // Skip empty rowset
-            tmp_rowsets.push_back(std::move(rowset));
+            tmp_rowsets.emplace(rowset.rowset_id_v2(), std::move(rowset));
         }
         return 0;
     };
 
-    auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, this]() 
-> int {
-        DORIS_CLOUD_DEFER {
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
+        doris::RowsetMetaCloudPB rowset;
+        if (!rowset.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t expiration = calc_expiration(rowset);
+        int64_t current_time = ::time(nullptr);
+        if (current_time < expiration) {
+            return 0;
+        }
+
+        DCHECK_GT(rowset.txn_id(), 0)
+                << "txn_id=" << rowset.txn_id() << " rowset=" << 
rowset.ShortDebugString();
+        if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+            return 0;
+        }
+
+        if (!rowset.has_resource_id()) {
+            if (rowset.num_segments() > 0) [[unlikely]] { // impossible
+                return 0;
+            }
+            metrics_context.total_need_recycle_num++;
+            return 0;
+        }
+
+        metrics_context.total_need_recycle_num++;
+        if (rowset.num_segments() > 0) {
+            metrics_context.total_need_recycle_data_size += 
rowset.total_disk_size();
+            segment_metrics_context_.total_need_recycle_data_size += 
rowset.total_disk_size();
+            segment_metrics_context_.total_need_recycle_num += 
rowset.num_segments();
+        }
+        return 0;
+    };
+
+    auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, 
&metrics_context,
+                      this]() -> int {
+        std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&](int*) {
             tmp_rowset_keys.clear();
             tmp_rowsets.clear();
-        };
-        if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET) 
!= 0) {
+        });
+        if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET, 
metrics_context) !=
+            0) {
             LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" 
<< instance_id_;
             return -1;
         }
@@ -2364,8 +2789,30 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         return 0;
     };
 
-    return scan_and_recycle(tmp_rs_key0, tmp_rs_key1, 
std::move(handle_rowset_kv),
-                            std::move(loop_done));
+    return scan_for_recycle_and_statistics(tmp_rs_key0, tmp_rs_key1, 
"tmp_rowsets", metrics_context,
+                                           std::move(scan_and_statistics),
+                                           std::move(handle_rowset_kv), 
std::move(loop_done));
+}
+
+int InstanceRecycler::scan_for_recycle_and_statistics(
+        std::string begin, std::string_view end, std::string task_name,
+        RecyclerMetricsContext& metrics_context,
+        std::function<int(std::string_view k, std::string_view v)> 
statistics_func,
+        std::function<int(std::string_view k, std::string_view v)> 
recycle_func,
+        std::function<int()> loop_done) {
+    if (config::enable_recycler_metrics) {
+        scan_and_recycle(begin, end, std::move(statistics_func));
+
+        // report to bvar
+        metrics_context.report(true);
+        tablet_metrics_context_.report(true);
+        segment_metrics_context_.report(true);
+
+        int ret = scan_and_recycle(begin, end, std::move(recycle_func), 
std::move(loop_done));
+        return ret;
+    } else {
+        return scan_and_recycle(begin, end, std::move(recycle_func), 
std::move(loop_done));
+    }
 }
 
 int InstanceRecycler::scan_and_recycle(
@@ -2433,6 +2880,7 @@ int InstanceRecycler::abort_timeout_txn() {
     int64_t num_timeout = 0;
     int64_t num_abort = 0;
     int64_t num_advance = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
     TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
@@ -2450,7 +2898,8 @@ int InstanceRecycler::abort_timeout_txn() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("end to abort timeout txn, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("end to abort timeout txn, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_timeout", num_timeout)
@@ -2462,7 +2911,7 @@ int InstanceRecycler::abort_timeout_txn() {
             
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
     auto handle_txn_running_kv = [&num_scanned, &num_timeout, &num_abort, 
&num_advance,
-                                  &current_time,
+                                  &current_time, &metrics_context,
                                   this](std::string_view k, std::string_view 
v) -> int {
         ++num_scanned;
 
@@ -2557,14 +3006,57 @@ int InstanceRecycler::abort_timeout_txn() {
                         .tag("txn_id", txn_id);
                 return -1;
             }
-            ++num_abort;
+            metrics_context.total_recycled_num = ++num_abort;
+            metrics_context.report();
         }
 
         return 0;
     };
 
-    return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
-                            std::move(handle_txn_running_kv));
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, &current_time, 
this](std::string_view k,
+                                                                       
std::string_view v) -> int {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string_view k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        if (decode_key(&k1, &out) != 0) {
+            return 0;
+        }
+        int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+        int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+        // Update txn_info
+        std::string txn_inf_key, txn_inf_val;
+        txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
+        err = txn->get(txn_inf_key, &txn_inf_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        TxnInfoPB txn_info;
+        if (!txn_info.ParseFromString(txn_inf_val)) {
+            return 0;
+        }
+
+        if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
+            TxnRunningPB txn_running_pb;
+            if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
+                return 0;
+            }
+            if (!config::force_immediate_recycle && 
txn_running_pb.timeout_time() > current_time) {
+                return 0;
+            }
+            metrics_context.total_need_recycle_num++;
+        }
+        return 0;
+    };
+
+    return scan_for_recycle_and_statistics(
+            begin_txn_running_key, end_txn_running_key, "abort_timeout_txns", 
metrics_context,
+            std::move(scan_and_statistics), std::move(handle_txn_running_kv));
 }
 
 int InstanceRecycler::recycle_expired_txn_label() {
@@ -2572,6 +3064,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
     int64_t num_scanned = 0;
     int64_t num_expired = 0;
     int64_t num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
     int ret = 0;
 
     RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
@@ -2590,7 +3083,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("end to recycle expired txn, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("end to recycle expired txn, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
@@ -2633,6 +3127,20 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return 0;
     };
 
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
+        RecycleTxnPB recycle_txn_pb;
+        if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        if ((config::force_immediate_recycle) ||
+            (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+            (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+            metrics_context.total_need_recycle_num++;
+        }
+        return 0;
+    };
+
     auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
         std::string_view k1 = k;
         //RecycleTxnKeyInfo 0:instance_id  1:db_id  2:txn_id
@@ -2712,7 +3220,9 @@ int InstanceRecycler::recycle_expired_txn_label() {
             LOG(WARNING) << "failed to delete expired txn, err=" << err << " 
key=" << hex(k);
             return -1;
         }
-        ++num_recycled;
+        metrics_context.total_recycled_num = ++num_recycled;
+        metrics_context.report();
+
         LOG(INFO) << "recycle expired txn, key=" << hex(k);
         return 0;
     };
@@ -2757,8 +3267,9 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return ret;
     };
 
-    return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
-                            std::move(handle_recycle_txn_kv), 
std::move(loop_done));
+    return scan_for_recycle_and_statistics(
+            begin_recycle_txn_key, end_recycle_txn_key, "expired_txn_labels", 
metrics_context,
+            std::move(scan_and_statistics), std::move(handle_recycle_txn_kv), 
std::move(loop_done));
 }
 
 struct CopyJobIdTuple {
@@ -2880,6 +3391,7 @@ int InstanceRecycler::recycle_copy_jobs() {
     // Used for INTERNAL stage's copy jobs to tag each batch for log trace
     uint64_t batch_count = 0;
     const std::string task_name = "recycle_copy_jobs";
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     LOG_INFO("begin to recycle copy jobs").tag("instance_id", instance_id_);
 
@@ -2890,7 +3402,8 @@ int InstanceRecycler::recycle_copy_jobs() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle copy jobs finished, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle copy jobs finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_finished", num_finished)
@@ -2906,7 +3419,7 @@ int InstanceRecycler::recycle_copy_jobs() {
     copy_job_key(key_info1, &key1);
     std::unordered_map<std::string, std::shared_ptr<BatchObjStoreAccessor>> 
stage_accessor_map;
     auto recycle_func = [&start_time, &num_scanned, &num_finished, 
&num_expired, &num_recycled,
-                         &batch_count, &stage_accessor_map, &task_name,
+                         &batch_count, &stage_accessor_map, &task_name, 
&metrics_context,
                          this](std::string_view k, std::string_view v) -> int {
         ++num_scanned;
         CopyJobPB copy_job;
@@ -3013,12 +3526,51 @@ int InstanceRecycler::recycle_copy_jobs() {
             return -1;
         }
 
-        ++num_recycled;
+        metrics_context.total_recycled_num = ++num_recycled;
+        metrics_context.report();
         check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, 
start_time);
         return 0;
     };
 
-    return scan_and_recycle(key0, key1, std::move(recycle_func));
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context](std::string_view k, 
std::string_view v) -> int {
+        CopyJobPB copy_job;
+        if (!copy_job.ParseFromArray(v.data(), v.size())) {
+            LOG_WARNING("malformed copy job").tag("key", hex(k));
+            return 0;
+        }
+
+        if (copy_job.job_status() == CopyJobPB::FINISH) {
+            if (copy_job.stage_type() == StagePB::EXTERNAL) {
+                int64_t current_time =
+                        
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+                if (copy_job.finish_time_ms() > 0) {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.finish_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
+                        return 0;
+                    }
+                } else {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.start_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
+                        return 0;
+                    }
+                }
+            }
+        } else if (copy_job.job_status() == CopyJobPB::LOADING) {
+            int64_t current_time =
+                    
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+            if (!config::force_immediate_recycle && current_time <= 
copy_job.timeout_time_ms()) {
+                return 0;
+            }
+        }
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
+    return scan_for_recycle_and_statistics(key0, key1, "copy_jobs", 
metrics_context,
+                                           std::move(scan_and_statistics), 
std::move(recycle_func));
 }
 
 int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id,
@@ -3120,6 +3672,7 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
 int InstanceRecycler::recycle_stage() {
     int64_t num_scanned = 0;
     int64_t num_recycled = 0;
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
     const std::string task_name = "recycle_stage";
 
     LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_);
@@ -3131,7 +3684,8 @@ int InstanceRecycler::recycle_stage() {
         unregister_recycle_task(task_name);
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle stage, cost={}s", cost)
+        metrics_context.finish_report();
+        LOG_WARNING("recycle stage, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
                 .tag("num_recycled", num_recycled);
@@ -3143,8 +3697,8 @@ int InstanceRecycler::recycle_stage() {
     std::string key1 = recycle_stage_key(key_info1);
 
     std::vector<std::string_view> stage_keys;
-    auto recycle_func = [&start_time, &num_scanned, &num_recycled, 
&stage_keys, this](
-                                std::string_view k, std::string_view v) -> int 
{
+    auto recycle_func = [&start_time, &num_scanned, &num_recycled, 
&stage_keys, &metrics_context,
+                         this](std::string_view k, std::string_view v) -> int {
         ++num_scanned;
         RecycleStagePB recycle_stage;
         if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
@@ -3199,12 +3753,58 @@ int InstanceRecycler::recycle_stage() {
                          << ", ret=" << ret;
             return -1;
         }
-        ++num_recycled;
+        metrics_context.total_recycled_num = ++num_recycled;
+        metrics_context.report();
         check_recycle_task(instance_id_, "recycle_stage", num_scanned, 
num_recycled, start_time);
         stage_keys.push_back(k);
         return 0;
     };
 
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, this](std::string_view k,
+                                                        std::string_view v) -> 
int {
+        RecycleStagePB recycle_stage;
+        if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
+            LOG_WARNING("malformed recycle stage").tag("key", hex(k));
+            return 0;
+        }
+
+        int idx = stoi(recycle_stage.stage().obj_info().id());
+        if (idx > instance_info_.obj_info().size() || idx < 1) {
+            LOG(WARNING) << "invalid idx: " << idx;
+            return 0;
+        }
+
+        std::shared_ptr<StorageVaultAccessor> accessor;
+        int ret = SYNC_POINT_HOOK_RETURN_VALUE(
+                [&] {
+                    auto& old_obj = instance_info_.obj_info()[idx - 1];
+                    auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+                    if (!s3_conf) {
+                        return 0;
+                    }
+
+                    s3_conf->prefix = 
recycle_stage.stage().obj_info().prefix();
+                    std::shared_ptr<S3Accessor> s3_accessor;
+                    int ret = S3Accessor::create(std::move(s3_conf.value()), 
&s3_accessor);
+                    if (ret != 0) {
+                        return 0;
+                    }
+
+                    accessor = std::move(s3_accessor);
+                    return 0;
+                }(),
+                "recycle_stage:get_accessor", &accessor);
+
+        if (ret != 0) {
+            LOG(WARNING) << "failed to init accessor ret=" << ret;
+            return 0;
+        }
+
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
     auto loop_done = [&stage_keys, this]() -> int {
         if (stage_keys.empty()) return 0;
         DORIS_CLOUD_DEFER {
@@ -3216,75 +3816,128 @@ int InstanceRecycler::recycle_stage() {
         }
         return 0;
     };
-
-    return scan_and_recycle(key0, key1, std::move(recycle_func), 
std::move(loop_done));
+    return scan_for_recycle_and_statistics(key0, key1, "stages", 
metrics_context,
+                                           std::move(scan_and_statistics), 
std::move(recycle_func),
+                                           std::move(loop_done));
 }
 
 int InstanceRecycler::recycle_expired_stage_objects() {
     LOG_INFO("begin to recycle expired stage objects").tag("instance_id", 
instance_id_);
 
     int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    RecyclerMetricsContext metrics_context(instance_id_, 
"recycle_expired_stage_objects");
 
     DORIS_CLOUD_DEFER {
         int64_t cost =
                 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
-        LOG_INFO("recycle expired stage objects, cost={}s", 
cost).tag("instance_id", instance_id_);
+        metrics_context.finish_report();
+        LOG_WARNING("recycle expired stage objects, cost={}s", cost)
+                .tag("instance_id", instance_id_);
     };
+
     int ret = 0;
-    for (const auto& stage : instance_info_.stages()) {
-        std::stringstream ss;
-        ss << "instance_id=" << instance_id_ << ", stage_id=" << 
stage.stage_id() << ", user_name="
-           << (stage.mysql_user_name().empty() ? "null" : 
stage.mysql_user_name().at(0))
-           << ", user_id=" << (stage.mysql_user_id().empty() ? "null" : 
stage.mysql_user_id().at(0))
-           << ", prefix=" << stage.obj_info().prefix();
-
-        if (stopped()) break;
-        if (stage.type() == StagePB::EXTERNAL) {
-            continue;
-        }
-        int idx = stoi(stage.obj_info().id());
-        if (idx > instance_info_.obj_info().size() || idx < 1) {
-            LOG(WARNING) << "invalid idx: " << idx << ", id: " << 
stage.obj_info().id();
-            continue;
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, this]() {
+        for (const auto& stage : instance_info_.stages()) {
+            if (stopped()) {
+                break;
+            }
+            if (stage.type() == StagePB::EXTERNAL) {
+                continue;
+            }
+            int idx = stoi(stage.obj_info().id());
+            if (idx > instance_info_.obj_info().size() || idx < 1) {
+                continue;
+            }
+            const auto& old_obj = instance_info_.obj_info()[idx - 1];
+            auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+            if (!s3_conf) {
+                continue;
+            }
+            s3_conf->prefix = stage.obj_info().prefix();
+            std::shared_ptr<S3Accessor> accessor;
+            int ret1 = S3Accessor::create(*s3_conf, &accessor);
+            if (ret1 != 0) {
+                continue;
+            }
+            if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+                continue;
+            }
+            metrics_context.total_need_recycle_num++;
         }
+    };
 
-        const auto& old_obj = instance_info_.obj_info()[idx - 1];
-        auto s3_conf = S3Conf::from_obj_store_info(old_obj);
-        if (!s3_conf) {
-            LOG(WARNING) << "failed to init s3_conf with obj_info=" << 
old_obj.ShortDebugString();
-            continue;
-        }
+    auto handle_recycle_func = [&, this]() {
+        for (const auto& stage : instance_info_.stages()) {
+            std::stringstream ss;
+            ss << "instance_id=" << instance_id_ << ", stage_id=" << 
stage.stage_id()
+               << ", user_name="
+               << (stage.mysql_user_name().empty() ? "null" : 
stage.mysql_user_name().at(0))
+               << ", user_id="
+               << (stage.mysql_user_id().empty() ? "null" : 
stage.mysql_user_id().at(0))
+               << ", prefix=" << stage.obj_info().prefix();
 
-        s3_conf->prefix = stage.obj_info().prefix();
-        std::shared_ptr<S3Accessor> accessor;
-        int ret1 = S3Accessor::create(*s3_conf, &accessor);
-        if (ret1 != 0) {
-            LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " " 
<< ss.str();
-            ret = -1;
-            continue;
-        }
+            if (stopped()) {
+                break;
+            }
+            if (stage.type() == StagePB::EXTERNAL) {
+                continue;
+            }
+            int idx = stoi(stage.obj_info().id());
+            if (idx > instance_info_.obj_info().size() || idx < 1) {
+                LOG(WARNING) << "invalid idx: " << idx << ", id: " << 
stage.obj_info().id();
+                continue;
+            }
 
-        if (s3_conf->prefix.find("/stage/") == std::string::npos) {
-            LOG(WARNING) << "try to delete illegal prefix, which is 
catastrophic, " << ss.str();
-            ret = -1;
-            continue;
-        }
+            const auto& old_obj = instance_info_.obj_info()[idx - 1];
+            auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+            if (!s3_conf) {
+                LOG(WARNING) << "failed to init s3_conf with obj_info="
+                             << old_obj.ShortDebugString();
+                continue;
+            }
 
-        LOG(INFO) << "recycle expired stage objects, " << ss.str();
-        int64_t expiration_time =
-                
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
-                config::internal_stage_objects_expire_time_second;
-        if (config::force_immediate_recycle) {
-            expiration_time = INT64_MAX;
-        }
-        ret1 = accessor->delete_all(expiration_time);
-        if (ret1 != 0) {
-            LOG(WARNING) << "failed to recycle expired stage objects, ret=" << 
ret1 << " "
-                         << ss.str();
-            ret = -1;
-            continue;
+            s3_conf->prefix = stage.obj_info().prefix();
+            std::shared_ptr<S3Accessor> accessor;
+            int ret1 = S3Accessor::create(*s3_conf, &accessor);
+            if (ret1 != 0) {
+                LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " 
" << ss.str();
+                ret = -1;
+                continue;
+            }
+
+            if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+                LOG(WARNING) << "try to delete illegal prefix, which is 
catastrophic, " << ss.str();
+                ret = -1;
+                continue;
+            }
+
+            LOG(INFO) << "recycle expired stage objects, " << ss.str();
+            int64_t expiration_time =
+                    
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
+                    config::internal_stage_objects_expire_time_second;
+            if (config::force_immediate_recycle) {
+                expiration_time = INT64_MAX;
+            }
+            ret1 = accessor->delete_all(expiration_time);
+            if (ret1 != 0) {
+                LOG(WARNING) << "failed to recycle expired stage objects, 
ret=" << ret1 << " "
+                             << ss.str();
+                ret = -1;
+                continue;
+            }
+            metrics_context.total_recycled_num++;
+            metrics_context.report();
         }
-    }
+    };
+
+    // for calculate the total num or bytes of recyled objects
+    scan_and_statistics();
+
+    // report to bvar
+    metrics_context.report(true);
+
+    handle_recycle_func();
     return ret;
 }
 
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 84e4075e61b..d1ae8a056c8 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -21,13 +21,16 @@
 
 #include <atomic>
 #include <condition_variable>
+#include <cstdint>
 #include <deque>
 #include <functional>
 #include <memory>
 #include <string>
 #include <string_view>
 #include <thread>
+#include <utility>
 
+#include "common/bvars.h"
 #include "meta-service/txn_lazy_committer.h"
 #include "recycler/storage_vault_accessor.h"
 #include "recycler/white_black_list.h"
@@ -42,6 +45,7 @@ class InstanceRecycler;
 class StorageVaultAccessor;
 class Checker;
 class SimpleThreadPool;
+class RecyclerMetricsContext;
 struct RecyclerThreadPoolGroup {
     RecyclerThreadPoolGroup() = default;
     RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
@@ -165,15 +169,15 @@ public:
      * @param is_empty_tablet indicates whether the tablet has object files, 
can skip delete objects if tablet is empty
      * @return 0 for success otherwise error
      */
-    int recycle_tablets(int64_t table_id, int64_t index_id, int64_t 
partition_id = -1,
-                        bool is_empty_tablet = false);
+    int recycle_tablets(int64_t table_id, int64_t index_id, 
RecyclerMetricsContext& ctx,
+                        int64_t partition_id = -1, bool is_empty_tablet = 
false);
 
     /**
      * recycle all rowsets belonging to the tablet specified by `tablet_id`
      *
      * @return 0 for success otherwise error
      */
-    int recycle_tablet(int64_t tablet_id);
+    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& 
metrics_context);
 
     // scan and recycle useless partition version kv
     int recycle_versions();
@@ -218,6 +222,13 @@ private:
                          std::function<int(std::string_view k, 
std::string_view v)> recycle_func,
                          std::function<int()> loop_done = nullptr);
 
+    int scan_for_recycle_and_statistics(
+            std::string begin, std::string_view end, std::string task_name,
+            RecyclerMetricsContext& metrics_context,
+            std::function<int(std::string_view k, std::string_view v)> 
statistics_func,
+            std::function<int(std::string_view k, std::string_view v)> 
recycle_func,
+            std::function<int()> loop_done = nullptr);
+
     // return 0 for success otherwise error
     int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
 
@@ -227,8 +238,8 @@ private:
                            const std::string& rowset_id);
 
     // return 0 for success otherwise error
-    int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& 
rowsets,
-                           RowsetRecyclingState type);
+    int delete_rowset_data(const std::map<std::string, 
doris::RowsetMetaCloudPB>& rowsets,
+                           RowsetRecyclingState type, RecyclerMetricsContext& 
metrics_context);
 
     /**
      * Get stage storage info from instance and init StorageVaultAccessor
@@ -241,6 +252,14 @@ private:
 
     void unregister_recycle_task(const std::string& task_name);
 
+    // for scan all tablets and statistics metrics
+    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
+                                    RecyclerMetricsContext& metrics_context,
+                                    int64_t partition_id = -1, bool 
is_empty_tablet = false);
+
+    // for scan all rs of tablet and statistics metrics
+    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& 
metrics_context);
+
 private:
     std::atomic_bool stopped_ {false};
     std::shared_ptr<TxnKv> txn_kv_;
@@ -268,4 +287,109 @@ private:
     std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
 };
 
+class RecyclerMetricsContext {
+public:
+    RecyclerMetricsContext() = default;
+
+    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
+            : operation_type(std::move(operation_type)), 
instance_id(std::move(instance_id)) {
+        start();
+    }
+
+    ~RecyclerMetricsContext() = default;
+
+    int64_t total_need_recycle_data_size = 0;
+    int64_t total_need_recycle_num = 0;
+
+    std::atomic_long total_recycled_data_size {0};
+    std::atomic_long total_recycled_num {0};
+
+    std::string operation_type {};
+    std::string instance_id {};
+
+    double start_time = 0;
+
+    void start() {
+        start_time = duration_cast<std::chrono::milliseconds>(
+                             
std::chrono::system_clock::now().time_since_epoch())
+                             .count();
+    }
+
+    double duration() const {
+        return duration_cast<std::chrono::milliseconds>(
+                       std::chrono::system_clock::now().time_since_epoch())
+                       .count() -
+               start_time;
+    }
+
+    void reset() {
+        total_need_recycle_data_size = 0;
+        total_need_recycle_num = 0;
+        total_recycled_data_size.store(0);
+        total_recycled_num.store(0);
+        start_time = duration_cast<std::chrono::milliseconds>(
+                             
std::chrono::system_clock::now().time_since_epoch())
+                             .count();
+    }
+
+    void finish_report() {
+        if (!operation_type.empty()) {
+            double cost = duration();
+            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
+                    {instance_id, operation_type}, cost);
+            g_bvar_recycler_instance_recycle_round.put({instance_id, 
operation_type}, 1);
+            LOG(INFO) << "recycle instance: " << instance_id
+                      << ", operation type: " << operation_type << ", cost: " 
<< cost
+                      << " ms, total recycled num: " << 
total_recycled_num.load()
+                      << ", total recycled data size: " << 
total_recycled_data_size.load()
+                      << " bytes";
+            if (total_recycled_num.load()) {
+                g_bvar_recycler_instance_recycle_time_per_resource.put(
+                        {instance_id, operation_type}, cost / 
total_recycled_num.load());
+            } else {
+                g_bvar_recycler_instance_recycle_time_per_resource.put(
+                        {instance_id, operation_type}, -1);
+            }
+            if (total_recycled_data_size.load()) {
+                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
+                        {instance_id, operation_type}, 
total_recycled_data_size.load() / cost);
+            } else {
+                
g_bvar_recycler_instance_recycle_bytes_per_ms.put({instance_id, operation_type},
+                                                                  -1);
+            }
+        }
+    }
+
+    // `is_begin` is used to initialize total num of items need to be recycled
+    void report(bool is_begin = false) {
+        if (!operation_type.empty()) {
+            if (total_need_recycle_data_size > 0) {
+                // is init
+                if (is_begin) {
+                    g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
+                            {instance_id, operation_type}, 
total_need_recycle_data_size);
+                } else {
+                    g_bvar_recycler_instance_last_round_recycled_bytes.put(
+                            {instance_id, operation_type}, 
total_recycled_data_size.load());
+                    
g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
+                            {instance_id, operation_type}, 
total_recycled_data_size.load());
+                }
+            }
+
+            if (total_need_recycle_num > 0) {
+                // is init
+                if (is_begin) {
+                    g_bvar_recycler_instance_last_round_to_recycle_num.put(
+                            {instance_id, operation_type}, 
total_need_recycle_num);
+                } else {
+                    g_bvar_recycler_instance_last_round_recycled_num.put(
+                            {instance_id, operation_type}, 
total_recycled_num.load());
+                    
g_bvar_recycler_instance_recycle_total_num_since_started.put(
+                            {instance_id, operation_type}, 
total_recycled_num.load());
+                }
+            }
+        }
+    }
+};
+
 } // namespace doris::cloud
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index b7ed057b6b5..ff1b33bf9f1 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -54,6 +54,7 @@ using namespace doris;
 static const std::string instance_id = "instance_id_recycle_test";
 static int64_t current_time = 0;
 static constexpr int64_t db_id = 1000;
+static RecyclerMetricsContext ctx;
 
 static doris::cloud::RecyclerThreadPoolGroup thread_group;
 
@@ -1127,7 +1128,7 @@ TEST(RecyclerTest, recycle_tablet) {
 
     ASSERT_EQ(create_partition_version_kv(txn_kv.get(), table_id, 
partition_id), 0);
 
-    ASSERT_EQ(0, recycler.recycle_tablets(table_id, index_id));
+    ASSERT_EQ(0, recycler.recycle_tablets(table_id, index_id, ctx));
 
     // check rowset does not exist on s3
     std::unique_ptr<ListIterator> list_iter;
@@ -3663,16 +3664,17 @@ TEST(RecyclerTest, delete_rowset_data) {
         auto accessor = recycler.accessor_map_.begin()->second;
         // Delete multiple rowset files using one series of RowsetPB
         constexpr int index_id = 10001, tablet_id = 10002;
-        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
         for (int i = 0; i < 10; ++i) {
             auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, 
schemas[i % 5]);
             create_recycle_rowset(
                     txn_kv.get(), accessor.get(), rowset,
                     static_cast<RecycleRowsetPB::Type>(i % 
(RecycleRowsetPB::Type_MAX + 1)), true);
 
-            rowset_pbs.emplace_back(std::move(rowset));
+            rowset_pbs.emplace(rowset.rowset_id_v2(), std::move(rowset));
         }
-        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::FORMAL_ROWSET));
+        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::FORMAL_ROWSET,
+                                                 ctx));
         std::unique_ptr<ListIterator> list_iter;
         ASSERT_EQ(0, accessor->list_all(&list_iter));
         ASSERT_FALSE(list_iter->has_next());
@@ -3768,16 +3770,17 @@ TEST(RecyclerTest, 
delete_rowset_data_without_inverted_index_storage_format) {
         auto accessor = recycler.accessor_map_.begin()->second;
         // Delete multiple rowset files using one series of RowsetPB
         constexpr int index_id = 10001, tablet_id = 10002;
-        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
         for (int i = 0; i < 10; ++i) {
             auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, 
schemas[i % 5]);
             create_recycle_rowset(
                     txn_kv.get(), accessor.get(), rowset,
                     static_cast<RecycleRowsetPB::Type>(i % 
(RecycleRowsetPB::Type_MAX + 1)), true);
 
-            rowset_pbs.emplace_back(std::move(rowset));
+            rowset_pbs.emplace(rowset.rowset_id_v2(), std::move(rowset));
         }
-        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::FORMAL_ROWSET));
+        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::FORMAL_ROWSET,
+                                                 ctx));
         std::unique_ptr<ListIterator> list_iter;
         ASSERT_EQ(0, accessor->list_all(&list_iter));
         ASSERT_FALSE(list_iter->has_next());
@@ -3934,7 +3937,7 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
 
     // recycle tablet will fail because unuseful obj accessor can not 
connectted
-    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
     // however, useful mock accessor can recycle tablet
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 1);
 }
@@ -4016,7 +4019,7 @@ TEST(RecyclerTest, recycle_tablet_without_resource_id) {
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
 
     // recycle tablet will fail because unuseful obj accessor can not 
connectted
-    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
     // no resource id, cannot recycle
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
 }
@@ -4098,7 +4101,7 @@ TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) 
{
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
 
     // recycle tablet will fail because unuseful obj accessor can not 
connectted
-    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    EXPECT_EQ(recycler.recycle_tablet(0, ctx), -1);
     // no resource id, cannot recycle
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
 }
@@ -4321,7 +4324,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
                                   std::make_shared<TxnLazyCommitter>(txn_kv));
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
-        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
         doris::RowsetMetaCloudPB rowset;
         rowset.set_rowset_id(0); // useless but required
         rowset.set_rowset_id_v2("1");
@@ -4333,7 +4336,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
         rowset.mutable_tablet_schema()->CopyFrom(schema);
         create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1);
         rowset.clear_tablet_schema();
-        rowset_pbs.emplace_back(rowset);
+        rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
 
         std::unordered_set<std::string> list_files;
         std::unique_ptr<ListIterator> iter;
@@ -4348,7 +4351,8 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
         EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
         EXPECT_TRUE(list_files.contains("data/10000/1_0_1.idx"));
 
-        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET));
+        ASSERT_EQ(0,
+                  recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET, ctx));
         list_files.clear();
         iter.reset();
         EXPECT_EQ(accessor->list_all(&iter), 0);
@@ -4400,7 +4404,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
                                   std::make_shared<TxnLazyCommitter>(txn_kv));
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
-        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
         doris::RowsetMetaCloudPB rowset;
         rowset.set_rowset_id(0); // useless but required
         rowset.set_rowset_id_v2("1");
@@ -4412,7 +4416,7 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
         rowset.mutable_tablet_schema()->CopyFrom(schema);
         create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
         rowset.clear_tablet_schema();
-        rowset_pbs.emplace_back(rowset);
+        rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
 
         std::unordered_set<std::string> list_files;
         std::unique_ptr<ListIterator> iter;
@@ -4427,7 +4431,8 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
         EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
         EXPECT_TRUE(list_files.contains("data/10000/1_0.idx"));
 
-        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET));
+        ASSERT_EQ(0,
+                  recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET, ctx));
         list_files.clear();
         iter.reset();
         EXPECT_EQ(accessor->list_all(&iter), 0);
@@ -4484,7 +4489,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) 
{
                                   std::make_shared<TxnLazyCommitter>(txn_kv));
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
-        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        std::map<std::string, doris::RowsetMetaCloudPB> rowset_pbs;
         doris::RowsetMetaCloudPB rowset;
         rowset.set_rowset_id(0); // useless but required
         rowset.set_rowset_id_v2("1");
@@ -4496,7 +4501,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) 
{
         rowset.mutable_tablet_schema()->CopyFrom(schema);
         create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
         rowset.clear_tablet_schema();
-        rowset_pbs.emplace_back(rowset);
+        rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
 
         rowset.set_rowset_id(0); // useless but required
         rowset.set_rowset_id_v2("2");
@@ -4508,7 +4513,7 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) 
{
         rowset.mutable_tablet_schema()->CopyFrom(schema);
         create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
         rowset.clear_tablet_schema();
-        rowset_pbs.emplace_back(rowset);
+        rowset_pbs.emplace(rowset.rowset_id_v2(), rowset);
 
         std::unordered_set<std::string> list_files;
         std::unique_ptr<ListIterator> iter;
@@ -4525,7 +4530,8 @@ TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) 
{
         EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
         EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
 
-        EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET));
+        EXPECT_EQ(-1,
+                  recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET, ctx));
         list_files.clear();
         iter.reset();
         EXPECT_EQ(accessor->list_all(&iter), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to