This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e3b18b588ba [fix](recycler) Fix CountdownEvent error and hang (#45760) e3b18b588ba is described below commit e3b18b588ba0f737cf3a1c802e7cc444ec9d71e8 Author: abmdocrt <lianyuk...@selectdb.com> AuthorDate: Tue Dec 24 15:16:03 2024 +0800 [fix](recycler) Fix CountdownEvent error and hang (#45760) Fix CountdownEvent error "Invoking add_count() after wait() was invoked" --------- Co-authored-by: Gavin Chou <gavineaglec...@gmail.com> --- cloud/src/common/simple_thread_pool.h | 5 +- cloud/src/recycler/recycler.cpp | 1 - cloud/src/recycler/sync_executor.h | 26 +++++++++- cloud/test/util_test.cpp | 90 +++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 4 deletions(-) diff --git a/cloud/src/common/simple_thread_pool.h b/cloud/src/common/simple_thread_pool.h index e18d6787bf7..37a4cedbdad 100644 --- a/cloud/src/common/simple_thread_pool.h +++ b/cloud/src/common/simple_thread_pool.h @@ -19,6 +19,7 @@ #include <atomic> #include <condition_variable> +#include <iostream> #include <memory> #include <mutex> #include <thread> @@ -154,8 +155,10 @@ private: } try { job(); + } catch (const std::exception& e) { + std::cerr << "exception happened when execute job. err: " << e.what() << std::endl; } catch (...) { - // do nothing + std::cerr << "exception happened when execute job." << std::endl; } } } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 6877d7e433b..ca22b28e031 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1278,7 +1278,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_); return -1; } - sync_executor.reset(); if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0; // sort the vector using key's order std::sort(tablet_keys.begin(), tablet_keys.end(), diff --git a/cloud/src/recycler/sync_executor.h b/cloud/src/recycler/sync_executor.h index c84e5e22467..909f36a56c4 100644 --- a/cloud/src/recycler/sync_executor.h +++ b/cloud/src/recycler/sync_executor.h @@ -18,10 +18,12 @@ #pragma once #include <bthread/countdown_event.h> +#include <cpp/sync_point.h> #include <fmt/core.h> #include <gen_cpp/cloud.pb.h> #include <glog/logging.h> +#include <chrono> #include <future> #include <string> @@ -48,10 +50,12 @@ public: return *this; } std::vector<T> when_all(bool* finished) { + std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&](int*) { _reset(); }); timespec current_time; auto current_time_second = time(nullptr); current_time.tv_sec = current_time_second + 300; current_time.tv_nsec = 0; + // Wait for all tasks to complete while (0 != _count.timed_wait(current_time)) { current_time.tv_sec += 300; LOG(WARNING) << _name_tag << " has already taken 5 min, cost: " @@ -65,11 +69,26 @@ public: *finished = false; return res; } - res.emplace_back((*task).get()); + size_t max_wait_ms = 10000; + TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time", &max_wait_ms); + // _count.timed_wait has already ensured that all tasks are completed. + // The 10 seconds here is just waiting for the task results to be returned, + // so 10 seconds is more than enough. + auto status = task->wait_for(max_wait_ms); + if (status == std::future_status::ready) { + res.emplace_back(task->get()); + } else { + *finished = false; + LOG(WARNING) << _name_tag << " task timed out after 10 seconds"; + return res; + } } return res; } - void reset() { + +private: + void _reset() { + _count.reset(0); _res.clear(); _stop_token = false; } @@ -98,6 +117,9 @@ private: } _pro.set_value(std::move(t)); } + std::future_status wait_for(size_t milliseconds) { + return _fut.wait_for(std::chrono::milliseconds(milliseconds)); + } bool valid() { return _valid; } T get() { return _fut.get(); } diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp index c88ef555f82..e505b2b99a5 100644 --- a/cloud/test/util_test.cpp +++ b/cloud/test/util_test.cpp @@ -18,6 +18,7 @@ #include "recycler/util.h" #include <chrono> +#include <stdexcept> #include <string> #include <string_view> #include <thread> @@ -28,6 +29,7 @@ #include "common/logging.h" #include "common/simple_thread_pool.h" #include "common/string_util.h" +#include "cpp/sync_point.h" #include "gtest/gtest.h" #include "recycler/recycler.h" #include "recycler/sync_executor.h" @@ -235,3 +237,91 @@ TEST(UtilTest, normal) { std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); }); } } + +TEST(UtilTest, test_add_after_when_all) { + auto f = []() { + auto pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); + pool->start(); + SyncExecutor<int> sync_executor(pool, "test add after when all: inside", + [](int k) { return k != 0; }); + auto f1 = []() { return 0; }; + sync_executor.add(f1); + bool finished = true; + std::vector<int> res = sync_executor.when_all(&finished); + sync_executor.add(f1); + res = sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); + return 0; + }; + + auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); + s3_producer_pool->start(); + SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test add after when all: outside", + [](int k) { return k != 0; }); + s3_sync_executor.add(f); + bool finished = true; + std::vector<int> res = s3_sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); +} + +TEST(UtilTest, exception) { + auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); + s3_producer_pool->start(); + { + SyncExecutor<int> sync_executor(s3_producer_pool, "exception test", + [](int k) { return k != 0; }); + auto f = []() { + throw(std::runtime_error("test exception")); + return 1; + }; + sync_executor.add(f); + bool finished = true; + std::vector<int> res = sync_executor.when_all(&finished); + EXPECT_EQ(0, res.size()); + EXPECT_EQ(finished, false); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); }); + } +} + +TEST(UtilTest, test_sync_executor) { + auto f = []() { + sleep(1); + auto pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); + pool->start(); + SyncExecutor<int> sync_executor(pool, "test sync executor: inside", + [](int k) { return k != 0; }); + auto f1 = []() { return 0; }; + sync_executor.add(f1); + bool finished = true; + std::vector<int> res = sync_executor.when_all(&finished); + sync_executor.add(f1); + res = sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); + return 0; + }; + std::mutex go_mutex; + + auto* sp = doris::SyncPoint::get_instance(); + sp->set_call_back("SyncExecutor::when_all.set_wait_time", [&](auto&& args) { + std::unique_lock<std::mutex> _lock(go_mutex); + auto max_wait_time = *doris::try_any_cast<size_t*>(args[0]); + max_wait_time = 100; + }); + + auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); + s3_producer_pool->start(); + SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test sync executor: outside", + [](int k) { return k != 0; }); + s3_sync_executor.add(f); + bool finished = true; + std::vector<int> res = s3_sync_executor.when_all(&finished); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); }); +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org