This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3c0e7244877 [feature](merge-cloud) Add check long running task mechanism for recycle task (#32589) 3c0e7244877 is described below commit 3c0e72448779e3ba56f131480b8f9ff4b80a6f8e Author: walter <w41te...@gmail.com> AuthorDate: Mon Mar 25 23:01:44 2024 +0800 [feature](merge-cloud) Add check long running task mechanism for recycle task (#32589) * [feature](merge-cloud) Set instance_recycler_worker_pool_size default 1 * We meet the error `responseCode=503 error="Please reduce your request rate.` with aws s3 storage in the recycler log, so set instance_recycler_worker_pool_size default 1 to reduce parallel of delete objects Co-authored-by: w41ter <w41te...@gmail.com> * [feature](merge-cloud) Add check long running task mechanism for recycle task * In order to report long running recycle task, implement a check_recycle_task function Co-authored-by: w41ter <w41te...@gmail.com> --------- Co-authored-by: Lei Zhang <27994433+swjtu-zhang...@users.noreply.github.com> --- cloud/src/common/config.h | 5 +- cloud/src/recycler/recycler.cpp | 167 +++++++++++++++++++++++++++++++--------- cloud/src/recycler/recycler.h | 12 +++ cloud/test/recycler_test.cpp | 9 ++- 4 files changed, 154 insertions(+), 39 deletions(-) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 859271f6503..03ae47abe56 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -60,7 +60,7 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 3h CONF_Strings(recycle_whitelist, ""); // Comma seprated list // These instances will not be recycled, only effective when whitelist is empty. CONF_Strings(recycle_blacklist, ""); // Comma seprated list -CONF_mInt32(instance_recycler_worker_pool_size, "10"); +CONF_mInt32(instance_recycler_worker_pool_size, "1"); CONF_Bool(enable_checker, "false"); // Currently only used for recycler test CONF_Bool(enable_inverted_check, "false"); @@ -69,6 +69,9 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min // interval for check object CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours +CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min +CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h + CONF_String(test_s3_ak, "ak"); CONF_String(test_s3_sk, "sk"); CONF_String(test_s3_endpoint, "endpoint"); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index c765a58d0fc..1ca1e05f741 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -49,6 +49,8 @@ namespace doris::cloud { +using namespace std::chrono; + // return 0 for success get a key, 1 for key not found, negative for error [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, std::string& val) { std::unique_ptr<Transaction> txn; @@ -143,6 +145,23 @@ static int txn_remove(TxnKv* txn_kv, std::vector<std::string> keys) { } } +static inline void check_recycle_task(const std::string& instance_id, const std::string& task_name, + int64_t num_scanned, int64_t num_recycled, + int64_t start_time) { + if ((num_scanned % 10000) == 0 && (num_scanned > 0)) [[unlikely]] { + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; + if (cost > config::recycle_task_threshold_seconds) { + LOG_INFO("recycle task cost too much time cost={}s", cost) + .tag("instance_id", instance_id) + .tag("task", task_name) + .tag("num_scanned", num_scanned) + .tag("num_recycled", num_recycled); + } + } + return; +} + Recycler::Recycler(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) { ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port); } @@ -221,7 +240,6 @@ void Recycler::recycle_callback() { } if (stopped()) return; LOG_INFO("begin to recycle instance").tag("instance_id", instance_id); - using namespace std::chrono; auto ctime_ms = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); ret = instance_recycler->do_recycle(); // If instance recycler has been aborted, don't finish this job @@ -268,6 +286,23 @@ void Recycler::lease_recycle_jobs() { } } +void Recycler::check_recycle_tasks() { + while (!stopped()) { + std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map; + { + std::lock_guard lock(mtx_); + recycling_instance_map = recycling_instance_map_; + } + for (auto& entry : recycling_instance_map) { + entry.second->check_recycle_tasks(); + } + + std::unique_lock lock(mtx_); + notifier_.wait_for(lock, std::chrono::seconds(config::check_recycle_task_interval_seconds), + [&]() { return stopped(); }); + } +} + int Recycler::start(brpc::Server* server) { instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist); @@ -298,6 +333,7 @@ int Recycler::start(brpc::Server* server) { } workers_.push_back(std::thread(std::mem_fn(&Recycler::lease_recycle_jobs), this)); + workers_.push_back(std::thread(std::mem_fn(&Recycler::check_recycle_tasks), this)); return 0; } @@ -470,7 +506,6 @@ int InstanceRecycler::recycle_deleted_instance() { LOG_INFO("begin to recycle deleted instance").tag("instance_id", instance_id_); int ret = 0; - using namespace std::chrono; auto start_time = steady_clock::now(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { @@ -560,6 +595,7 @@ int InstanceRecycler::recycle_deleted_instance() { } int InstanceRecycler::recycle_indexes() { + const std::string task_name = "recycle_indexes"; int num_scanned = 0; int num_expired = 0; int num_recycled = 0; @@ -573,11 +609,13 @@ int InstanceRecycler::recycle_indexes() { LOG_INFO("begin to recycle indexes").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle indexes finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -659,6 +697,7 @@ int InstanceRecycler::recycle_indexes() { return -1; } ++num_recycled; + check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); index_keys.push_back(k); return 0; }; @@ -678,6 +717,7 @@ int InstanceRecycler::recycle_indexes() { } int InstanceRecycler::recycle_partitions() { + const std::string task_name = "recycle_partitions"; int num_scanned = 0; int num_expired = 0; int num_recycled = 0; @@ -691,11 +731,13 @@ int InstanceRecycler::recycle_partitions() { LOG_INFO("begin to recycle partitions").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle partitions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -786,6 +828,7 @@ int InstanceRecycler::recycle_partitions() { } if (ret == 0) { ++num_recycled; + check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); partition_keys.push_back(k); if (part_pb.db_id() > 0) { version_keys.push_back(version_key( @@ -831,7 +874,6 @@ int InstanceRecycler::recycle_versions() { LOG_INFO("begin to recycle partition versions").tag("instance_id", instance_id_); - using namespace std::chrono; auto start_time = steady_clock::now(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { @@ -928,7 +970,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ .tag("index_id", index_id) .tag("partition_id", partition_id); - using namespace std::chrono; auto start_time = steady_clock::now(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { @@ -1199,7 +1240,6 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("instance_id", instance_id_) .tag("tablet_id", tablet_id); - using namespace std::chrono; auto start_time = steady_clock::now(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { @@ -1261,6 +1301,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { } int InstanceRecycler::recycle_rowsets() { + const std::string task_name = "recycle_rowsets"; int num_scanned = 0; int num_expired = 0; int num_prepare = 0; @@ -1277,11 +1318,13 @@ int InstanceRecycler::recycle_rowsets() { LOG_INFO("begin to recycle rowsets").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle rowsets finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -1325,6 +1368,8 @@ int InstanceRecycler::recycle_rowsets() { << instance_id_; } else { num_recycled.fetch_add(keys.size(), std::memory_order_relaxed); + check_recycle_task(instance_id_, "recycle_rowsets", num_scanned, + num_recycled, start_time); } }, 0); @@ -1472,6 +1517,7 @@ int InstanceRecycler::recycle_rowsets() { } int InstanceRecycler::recycle_tmp_rowsets() { + const std::string task_name = "recycle_tmp_rowsets"; int num_scanned = 0; int num_expired = 0; int num_recycled = 0; @@ -1487,11 +1533,13 @@ int InstanceRecycler::recycle_tmp_rowsets() { LOG_INFO("begin to recycle tmp rowsets").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle tmp rowsets finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -1616,6 +1664,7 @@ int InstanceRecycler::scan_and_recycle( } int InstanceRecycler::abort_timeout_txn() { + const std::string task_name = "abort_timeout_txn"; int num_scanned = 0; int num_timeout = 0; int num_abort = 0; @@ -1629,11 +1678,13 @@ int InstanceRecycler::abort_timeout_txn() { LOG_INFO("begin to abort timeout txn").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("end to abort timeout txn, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -1731,6 +1782,7 @@ int InstanceRecycler::abort_timeout_txn() { } int InstanceRecycler::recycle_expired_txn_label() { + const std::string task_name = "recycle_expired_txn_label"; int num_scanned = 0; int num_expired = 0; int num_recycled = 0; @@ -1744,11 +1796,12 @@ int InstanceRecycler::recycle_expired_txn_label() { LOG_INFO("begin to recycle expire txn").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); - + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("end to recycle expired txn, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -1970,14 +2023,17 @@ int InstanceRecycler::recycle_copy_jobs() { int num_recycled = 0; // Used for INTERNAL stage's copy jobs to tag each batch for log trace uint64_t batch_count = 0; + const std::string task_name = "recycle_copy_jobs"; LOG_INFO("begin to recycle copy jobs").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle copy jobs finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -1993,8 +2049,9 @@ int InstanceRecycler::recycle_copy_jobs() { copy_job_key(key_info0, &key0); copy_job_key(key_info1, &key1); std::unordered_map<std::string, std::shared_ptr<BatchObjStoreAccessor>> stage_accessor_map; - auto recycle_func = [&num_scanned, &num_finished, &num_expired, &num_recycled, &batch_count, - &stage_accessor_map, this](std::string_view k, std::string_view v) -> int { + auto recycle_func = [&start_time, &num_scanned, &num_finished, &num_expired, &num_recycled, + &batch_count, &stage_accessor_map, &task_name, + this](std::string_view k, std::string_view v) -> int { ++num_scanned; CopyJobPB copy_job; if (!copy_job.ParseFromArray(v.data(), v.size())) { @@ -2099,6 +2156,7 @@ int InstanceRecycler::recycle_copy_jobs() { } ++num_recycled; + check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); return 0; }; @@ -2222,14 +2280,17 @@ int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id, int InstanceRecycler::recycle_stage() { int num_scanned = 0; int num_recycled = 0; + const std::string task_name = "recycle_stage"; LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + register_recycle_task(task_name, start_time); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + unregister_recycle_task(task_name); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle stage, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -2245,7 +2306,7 @@ int InstanceRecycler::recycle_stage() { // Elements in `tmp_rowset_keys` has the same lifetime as `it` std::vector<std::string_view> stage_keys; - auto recycle_func = [&num_scanned, &num_recycled, &stage_keys, this]( + auto recycle_func = [&start_time, &num_scanned, &num_recycled, &stage_keys, this]( std::string_view k, std::string_view v) -> int { ++num_scanned; RecycleStagePB recycle_stage; @@ -2304,6 +2365,7 @@ int InstanceRecycler::recycle_stage() { return -1; } ++num_recycled; + check_recycle_task(instance_id_, "recycle_stage", num_scanned, num_recycled, start_time); stage_keys.push_back(k); return 0; }; @@ -2325,11 +2387,11 @@ int InstanceRecycler::recycle_stage() { int InstanceRecycler::recycle_expired_stage_objects() { LOG_INFO("begin to recycle expired stage objects").tag("instance_id", instance_id_); - using namespace std::chrono; - auto start_time = steady_clock::now(); + int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration<float>(steady_clock::now() - start_time).count(); + int64_t cost = + duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() - start_time; LOG_INFO("recycle expired stage objects, cost={}s", cost).tag("instance_id", instance_id_); }); int ret = 0; @@ -2391,4 +2453,37 @@ int InstanceRecycler::recycle_expired_stage_objects() { return ret; } +void InstanceRecycler::register_recycle_task(const std::string& task_name, int64_t start_time) { + std::lock_guard lock(recycle_tasks_mutex); + running_recycle_tasks[task_name] = start_time; +} + +void InstanceRecycler::unregister_recycle_task(const std::string& task_name) { + std::lock_guard lock(recycle_tasks_mutex); + DCHECK(running_recycle_tasks[task_name] > 0); + running_recycle_tasks.erase(task_name); +} + +bool InstanceRecycler::check_recycle_tasks() { + std::map<std::string, int64_t> tmp_running_recycle_tasks; + { + std::lock_guard lock(recycle_tasks_mutex); + tmp_running_recycle_tasks = running_recycle_tasks; + } + + bool found = false; + int64_t now = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); + for (auto& [task_name, start_time] : tmp_running_recycle_tasks) { + int64_t cost = now - start_time; + if (cost > config::recycle_task_threshold_seconds) [[unlikely]] { + LOG_INFO("recycle task cost too much time cost={}s", cost) + .tag("instance_id", instance_id_) + .tag("task", task_name); + found = true; + } + } + + return found; +} + } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 34139be6d1a..e745e815a79 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -59,6 +59,8 @@ private: void lease_recycle_jobs(); + void check_recycle_tasks(); + private: friend class RecyclerServiceImpl; @@ -155,6 +157,8 @@ public: // returns 0 for success otherwise error int recycle_expired_stage_objects(); + bool check_recycle_tasks(); + private: /** * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair. @@ -182,6 +186,10 @@ private: int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type, std::shared_ptr<ObjStoreAccessor>* accessor); + void register_recycle_task(const std::string& task_name, int64_t start_time); + + void unregister_recycle_task(const std::string& task_name); + private: std::atomic_bool stopped_ {false}; std::shared_ptr<TxnKv> txn_kv_; @@ -195,6 +203,10 @@ private: std::mutex recycled_tablets_mtx_; // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted. std::unordered_set<int64_t> recycled_tablets_; + + std::mutex recycle_tasks_mutex; + // <task_name, start_time>> + std::map<std::string, int64_t> running_recycle_tasks; }; } // namespace doris::cloud diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index cf6d606585d..87c70833a30 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -713,6 +713,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) { obj_info->set_prefix("recycle_rowsets"); config::instance_recycler_worker_pool_size = 10; + config::recycle_task_threshold_seconds = 0; InstanceRecycler recycler(txn_kv, instance); ASSERT_EQ(recycler.init(), 0); @@ -723,8 +724,11 @@ TEST(RecyclerTest, bench_recycle_rowsets) { *((int*)limit) = 100; std::this_thread::sleep_for(std::chrono::milliseconds(5)); }); - sp->set_call_back("MockAccessor::delete_objects", - [&](void* p) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); + sp->set_call_back("MockAccessor::delete_objects", [&](void* p) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + bool found = recycler.check_recycle_tasks(); + ASSERT_EQ(found, true); + }); sp->set_call_back("MockAccessor::delete_objects_by_prefix", [&](void* p) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); sp->enable_processing(); @@ -748,6 +752,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) { } ASSERT_EQ(recycler.recycle_rowsets(), 0); + ASSERT_EQ(recycler.check_recycle_tasks(), false); // check rowset does not exist on obj store std::vector<ObjectMeta> files; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org