This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c0e7244877 [feature](merge-cloud) Add check long running task 
mechanism for recycle task (#32589)
3c0e7244877 is described below

commit 3c0e72448779e3ba56f131480b8f9ff4b80a6f8e
Author: walter <w41te...@gmail.com>
AuthorDate: Mon Mar 25 23:01:44 2024 +0800

    [feature](merge-cloud) Add check long running task mechanism for recycle 
task (#32589)
    
    * [feature](merge-cloud) Set instance_recycler_worker_pool_size default 1
    
    * We meet the error `responseCode=503 error="Please reduce your request 
rate.`
      with aws s3 storage in the recycler log, so set 
instance_recycler_worker_pool_size
      default 1 to reduce parallel of delete objects
    
    Co-authored-by: w41ter <w41te...@gmail.com>
    
    * [feature](merge-cloud) Add check long running task mechanism for recycle 
task
    
    * In order to report long running recycle task, implement a  
check_recycle_task function
    
    Co-authored-by: w41ter <w41te...@gmail.com>
    
    ---------
    
    Co-authored-by: Lei Zhang <27994433+swjtu-zhang...@users.noreply.github.com>
---
 cloud/src/common/config.h       |   5 +-
 cloud/src/recycler/recycler.cpp | 167 +++++++++++++++++++++++++++++++---------
 cloud/src/recycler/recycler.h   |  12 +++
 cloud/test/recycler_test.cpp    |   9 ++-
 4 files changed, 154 insertions(+), 39 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 859271f6503..03ae47abe56 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -60,7 +60,7 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 
3h
 CONF_Strings(recycle_whitelist, ""); // Comma seprated list
 // These instances will not be recycled, only effective when whitelist is 
empty.
 CONF_Strings(recycle_blacklist, ""); // Comma seprated list
-CONF_mInt32(instance_recycler_worker_pool_size, "10");
+CONF_mInt32(instance_recycler_worker_pool_size, "1");
 CONF_Bool(enable_checker, "false");
 // Currently only used for recycler test
 CONF_Bool(enable_inverted_check, "false");
@@ -69,6 +69,9 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
 // interval for check object
 CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours
 
+CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
+CONF_mInt64(recycle_task_threshold_seconds, "10800");    // 3h
+
 CONF_String(test_s3_ak, "ak");
 CONF_String(test_s3_sk, "sk");
 CONF_String(test_s3_endpoint, "endpoint");
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index c765a58d0fc..1ca1e05f741 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -49,6 +49,8 @@
 
 namespace doris::cloud {
 
+using namespace std::chrono;
+
 // return 0 for success get a key, 1 for key not found, negative for error
 [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, 
std::string& val) {
     std::unique_ptr<Transaction> txn;
@@ -143,6 +145,23 @@ static int txn_remove(TxnKv* txn_kv, 
std::vector<std::string> keys) {
     }
 }
 
+static inline void check_recycle_task(const std::string& instance_id, const 
std::string& task_name,
+                                      int64_t num_scanned, int64_t 
num_recycled,
+                                      int64_t start_time) {
+    if ((num_scanned % 10000) == 0 && (num_scanned > 0)) [[unlikely]] {
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
+        if (cost > config::recycle_task_threshold_seconds) {
+            LOG_INFO("recycle task cost too much time cost={}s", cost)
+                    .tag("instance_id", instance_id)
+                    .tag("task", task_name)
+                    .tag("num_scanned", num_scanned)
+                    .tag("num_recycled", num_recycled);
+        }
+    }
+    return;
+}
+
 Recycler::Recycler(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) 
{
     ip_port_ = std::string(butil::my_ip_cstr()) + ":" + 
std::to_string(config::brpc_listen_port);
 }
@@ -221,7 +240,6 @@ void Recycler::recycle_callback() {
         }
         if (stopped()) return;
         LOG_INFO("begin to recycle instance").tag("instance_id", instance_id);
-        using namespace std::chrono;
         auto ctime_ms = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
         ret = instance_recycler->do_recycle();
         // If instance recycler has been aborted, don't finish this job
@@ -268,6 +286,23 @@ void Recycler::lease_recycle_jobs() {
     }
 }
 
+void Recycler::check_recycle_tasks() {
+    while (!stopped()) {
+        std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> 
recycling_instance_map;
+        {
+            std::lock_guard lock(mtx_);
+            recycling_instance_map = recycling_instance_map_;
+        }
+        for (auto& entry : recycling_instance_map) {
+            entry.second->check_recycle_tasks();
+        }
+
+        std::unique_lock lock(mtx_);
+        notifier_.wait_for(lock, 
std::chrono::seconds(config::check_recycle_task_interval_seconds),
+                           [&]() { return stopped(); });
+    }
+}
+
 int Recycler::start(brpc::Server* server) {
     instance_filter_.reset(config::recycle_whitelist, 
config::recycle_blacklist);
 
@@ -298,6 +333,7 @@ int Recycler::start(brpc::Server* server) {
     }
 
     workers_.push_back(std::thread(std::mem_fn(&Recycler::lease_recycle_jobs), 
this));
+    
workers_.push_back(std::thread(std::mem_fn(&Recycler::check_recycle_tasks), 
this));
     return 0;
 }
 
@@ -470,7 +506,6 @@ int InstanceRecycler::recycle_deleted_instance() {
     LOG_INFO("begin to recycle deleted instance").tag("instance_id", 
instance_id_);
 
     int ret = 0;
-    using namespace std::chrono;
     auto start_time = steady_clock::now();
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
@@ -560,6 +595,7 @@ int InstanceRecycler::recycle_deleted_instance() {
 }
 
 int InstanceRecycler::recycle_indexes() {
+    const std::string task_name = "recycle_indexes";
     int num_scanned = 0;
     int num_expired = 0;
     int num_recycled = 0;
@@ -573,11 +609,13 @@ int InstanceRecycler::recycle_indexes() {
 
     LOG_INFO("begin to recycle indexes").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle indexes finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -659,6 +697,7 @@ int InstanceRecycler::recycle_indexes() {
             return -1;
         }
         ++num_recycled;
+        check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, 
start_time);
         index_keys.push_back(k);
         return 0;
     };
@@ -678,6 +717,7 @@ int InstanceRecycler::recycle_indexes() {
 }
 
 int InstanceRecycler::recycle_partitions() {
+    const std::string task_name = "recycle_partitions";
     int num_scanned = 0;
     int num_expired = 0;
     int num_recycled = 0;
@@ -691,11 +731,13 @@ int InstanceRecycler::recycle_partitions() {
 
     LOG_INFO("begin to recycle partitions").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle partitions finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -786,6 +828,7 @@ int InstanceRecycler::recycle_partitions() {
         }
         if (ret == 0) {
             ++num_recycled;
+            check_recycle_task(instance_id_, task_name, num_scanned, 
num_recycled, start_time);
             partition_keys.push_back(k);
             if (part_pb.db_id() > 0) {
                 version_keys.push_back(version_key(
@@ -831,7 +874,6 @@ int InstanceRecycler::recycle_versions() {
 
     LOG_INFO("begin to recycle partition versions").tag("instance_id", 
instance_id_);
 
-    using namespace std::chrono;
     auto start_time = steady_clock::now();
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
@@ -928,7 +970,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
             .tag("index_id", index_id)
             .tag("partition_id", partition_id);
 
-    using namespace std::chrono;
     auto start_time = steady_clock::now();
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
@@ -1199,7 +1240,6 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
             .tag("instance_id", instance_id_)
             .tag("tablet_id", tablet_id);
 
-    using namespace std::chrono;
     auto start_time = steady_clock::now();
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
@@ -1261,6 +1301,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
 }
 
 int InstanceRecycler::recycle_rowsets() {
+    const std::string task_name = "recycle_rowsets";
     int num_scanned = 0;
     int num_expired = 0;
     int num_prepare = 0;
@@ -1277,11 +1318,13 @@ int InstanceRecycler::recycle_rowsets() {
 
     LOG_INFO("begin to recycle rowsets").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle rowsets finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -1325,6 +1368,8 @@ int InstanceRecycler::recycle_rowsets() {
                                      << instance_id_;
                     } else {
                         num_recycled.fetch_add(keys.size(), 
std::memory_order_relaxed);
+                        check_recycle_task(instance_id_, "recycle_rowsets", 
num_scanned,
+                                           num_recycled, start_time);
                     }
                 },
                 0);
@@ -1472,6 +1517,7 @@ int InstanceRecycler::recycle_rowsets() {
 }
 
 int InstanceRecycler::recycle_tmp_rowsets() {
+    const std::string task_name = "recycle_tmp_rowsets";
     int num_scanned = 0;
     int num_expired = 0;
     int num_recycled = 0;
@@ -1487,11 +1533,13 @@ int InstanceRecycler::recycle_tmp_rowsets() {
 
     LOG_INFO("begin to recycle tmp rowsets").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle tmp rowsets finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -1616,6 +1664,7 @@ int InstanceRecycler::scan_and_recycle(
 }
 
 int InstanceRecycler::abort_timeout_txn() {
+    const std::string task_name = "abort_timeout_txn";
     int num_scanned = 0;
     int num_timeout = 0;
     int num_abort = 0;
@@ -1629,11 +1678,13 @@ int InstanceRecycler::abort_timeout_txn() {
 
     LOG_INFO("begin to abort timeout txn").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("end to abort timeout txn, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -1731,6 +1782,7 @@ int InstanceRecycler::abort_timeout_txn() {
 }
 
 int InstanceRecycler::recycle_expired_txn_label() {
+    const std::string task_name = "recycle_expired_txn_label";
     int num_scanned = 0;
     int num_expired = 0;
     int num_recycled = 0;
@@ -1744,11 +1796,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
 
     LOG_INFO("begin to recycle expire txn").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
-
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("end to recycle expired txn, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -1970,14 +2023,17 @@ int InstanceRecycler::recycle_copy_jobs() {
     int num_recycled = 0;
     // Used for INTERNAL stage's copy jobs to tag each batch for log trace
     uint64_t batch_count = 0;
+    const std::string task_name = "recycle_copy_jobs";
 
     LOG_INFO("begin to recycle copy jobs").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle copy jobs finished, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -1993,8 +2049,9 @@ int InstanceRecycler::recycle_copy_jobs() {
     copy_job_key(key_info0, &key0);
     copy_job_key(key_info1, &key1);
     std::unordered_map<std::string, std::shared_ptr<BatchObjStoreAccessor>> 
stage_accessor_map;
-    auto recycle_func = [&num_scanned, &num_finished, &num_expired, 
&num_recycled, &batch_count,
-                         &stage_accessor_map, this](std::string_view k, 
std::string_view v) -> int {
+    auto recycle_func = [&start_time, &num_scanned, &num_finished, 
&num_expired, &num_recycled,
+                         &batch_count, &stage_accessor_map, &task_name,
+                         this](std::string_view k, std::string_view v) -> int {
         ++num_scanned;
         CopyJobPB copy_job;
         if (!copy_job.ParseFromArray(v.data(), v.size())) {
@@ -2099,6 +2156,7 @@ int InstanceRecycler::recycle_copy_jobs() {
         }
 
         ++num_recycled;
+        check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, 
start_time);
         return 0;
     };
 
@@ -2222,14 +2280,17 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
 int InstanceRecycler::recycle_stage() {
     int num_scanned = 0;
     int num_recycled = 0;
+    const std::string task_name = "recycle_stage";
 
     LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    register_recycle_task(task_name, start_time);
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        unregister_recycle_task(task_name);
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle stage, cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("num_scanned", num_scanned)
@@ -2245,7 +2306,7 @@ int InstanceRecycler::recycle_stage() {
 
     // Elements in `tmp_rowset_keys` has the same lifetime as `it`
     std::vector<std::string_view> stage_keys;
-    auto recycle_func = [&num_scanned, &num_recycled, &stage_keys, this](
+    auto recycle_func = [&start_time, &num_scanned, &num_recycled, 
&stage_keys, this](
                                 std::string_view k, std::string_view v) -> int 
{
         ++num_scanned;
         RecycleStagePB recycle_stage;
@@ -2304,6 +2365,7 @@ int InstanceRecycler::recycle_stage() {
             return -1;
         }
         ++num_recycled;
+        check_recycle_task(instance_id_, "recycle_stage", num_scanned, 
num_recycled, start_time);
         stage_keys.push_back(k);
         return 0;
     };
@@ -2325,11 +2387,11 @@ int InstanceRecycler::recycle_stage() {
 int InstanceRecycler::recycle_expired_stage_objects() {
     LOG_INFO("begin to recycle expired stage objects").tag("instance_id", 
instance_id_);
 
-    using namespace std::chrono;
-    auto start_time = steady_clock::now();
+    int64_t start_time = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
-        auto cost = duration<float>(steady_clock::now() - start_time).count();
+        int64_t cost =
+                
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - 
start_time;
         LOG_INFO("recycle expired stage objects, cost={}s", 
cost).tag("instance_id", instance_id_);
     });
     int ret = 0;
@@ -2391,4 +2453,37 @@ int InstanceRecycler::recycle_expired_stage_objects() {
     return ret;
 }
 
+void InstanceRecycler::register_recycle_task(const std::string& task_name, 
int64_t start_time) {
+    std::lock_guard lock(recycle_tasks_mutex);
+    running_recycle_tasks[task_name] = start_time;
+}
+
+void InstanceRecycler::unregister_recycle_task(const std::string& task_name) {
+    std::lock_guard lock(recycle_tasks_mutex);
+    DCHECK(running_recycle_tasks[task_name] > 0);
+    running_recycle_tasks.erase(task_name);
+}
+
+bool InstanceRecycler::check_recycle_tasks() {
+    std::map<std::string, int64_t> tmp_running_recycle_tasks;
+    {
+        std::lock_guard lock(recycle_tasks_mutex);
+        tmp_running_recycle_tasks = running_recycle_tasks;
+    }
+
+    bool found = false;
+    int64_t now = 
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+    for (auto& [task_name, start_time] : tmp_running_recycle_tasks) {
+        int64_t cost = now - start_time;
+        if (cost > config::recycle_task_threshold_seconds) [[unlikely]] {
+            LOG_INFO("recycle task cost too much time cost={}s", cost)
+                    .tag("instance_id", instance_id_)
+                    .tag("task", task_name);
+            found = true;
+        }
+    }
+
+    return found;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 34139be6d1a..e745e815a79 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -59,6 +59,8 @@ private:
 
     void lease_recycle_jobs();
 
+    void check_recycle_tasks();
+
 private:
     friend class RecyclerServiceImpl;
 
@@ -155,6 +157,8 @@ public:
     // returns 0 for success otherwise error
     int recycle_expired_stage_objects();
 
+    bool check_recycle_tasks();
+
 private:
     /**
      * Scan key-value pairs between [`begin`, `end`), and perform 
`recycle_func` on each key-value pair.
@@ -182,6 +186,10 @@ private:
     int init_copy_job_accessor(const std::string& stage_id, const 
StagePB::StageType& stage_type,
                                std::shared_ptr<ObjStoreAccessor>* accessor);
 
+    void register_recycle_task(const std::string& task_name, int64_t 
start_time);
+
+    void unregister_recycle_task(const std::string& task_name);
+
 private:
     std::atomic_bool stopped_ {false};
     std::shared_ptr<TxnKv> txn_kv_;
@@ -195,6 +203,10 @@ private:
     std::mutex recycled_tablets_mtx_;
     // Store recycled tablets, we can skip deleting rowset data of these 
tablets because these data has already been deleted.
     std::unordered_set<int64_t> recycled_tablets_;
+
+    std::mutex recycle_tasks_mutex;
+    // <task_name, start_time>>
+    std::map<std::string, int64_t> running_recycle_tasks;
 };
 
 } // namespace doris::cloud
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index cf6d606585d..87c70833a30 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -713,6 +713,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
     obj_info->set_prefix("recycle_rowsets");
 
     config::instance_recycler_worker_pool_size = 10;
+    config::recycle_task_threshold_seconds = 0;
     InstanceRecycler recycler(txn_kv, instance);
     ASSERT_EQ(recycler.init(), 0);
 
@@ -723,8 +724,11 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
         *((int*)limit) = 100;
         std::this_thread::sleep_for(std::chrono::milliseconds(5));
     });
-    sp->set_call_back("MockAccessor::delete_objects",
-                      [&](void* p) { 
std::this_thread::sleep_for(std::chrono::milliseconds(20)); });
+    sp->set_call_back("MockAccessor::delete_objects", [&](void* p) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        bool found = recycler.check_recycle_tasks();
+        ASSERT_EQ(found, true);
+    });
     sp->set_call_back("MockAccessor::delete_objects_by_prefix",
                       [&](void* p) { 
std::this_thread::sleep_for(std::chrono::milliseconds(20)); });
     sp->enable_processing();
@@ -748,6 +752,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
     }
 
     ASSERT_EQ(recycler.recycle_rowsets(), 0);
+    ASSERT_EQ(recycler.check_recycle_tasks(), false);
 
     // check rowset does not exist on obj store
     std::vector<ObjectMeta> files;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to