This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 9b0ad66 [runtime] Replace the thread pool in FragmentMgr (#4057) 9b0ad66 is described below commit 9b0ad66b78c8860de16d2bb0df41f53b5f0a2359 Author: HuangWei <huang...@apache.org> AuthorDate: Wed Jul 15 10:03:48 2020 +0800 [runtime] Replace the thread pool in FragmentMgr (#4057) --- be/src/common/config.h | 5 +- be/src/runtime/fragment_mgr.cpp | 114 ++++++++++++++---------------- be/src/runtime/fragment_mgr.h | 4 +- be/src/runtime/plan_fragment_executor.cpp | 4 ++ be/src/runtime/plan_fragment_executor.h | 5 ++ be/test/runtime/fragment_mgr_test.cpp | 36 +++++++++- 6 files changed, 99 insertions(+), 69 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 3adf389..4915732 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -322,8 +322,9 @@ namespace config { CONF_mInt32(olap_table_sink_send_interval_ms, "10"); // Fragment thread pool - CONF_Int32(fragment_pool_thread_num, "64"); - CONF_Int32(fragment_pool_queue_size, "1024"); + CONF_Int32(fragment_pool_thread_num_min, "64"); + CONF_Int32(fragment_pool_thread_num_max, "512"); + CONF_Int32(fragment_pool_queue_size, "2048"); //for cast // CONF_Bool(cast, "true"); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 30ec73b..9413f82 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -23,30 +23,32 @@ #include <gperftools/profiler.h> #include <boost/bind.hpp> -#include <thrift/protocol/TDebugProtocol.h> #include "agent/cgroups_mgr.h" #include "common/object_pool.h" #include "common/resource_tls.h" -#include "gen_cpp/DataSinks_types.h" -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/HeartbeatService.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "gen_cpp/PlanNodes_types.h" -#include "gen_cpp/QueryPlanExtra_types.h" -#include "gen_cpp/Types_types.h" -#include "gutil/strings/substitute.h" -#include "runtime/client_cache.h" -#include "runtime/datetime_value.h" -#include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/plan_fragment_executor.h" #include "service/backend_options.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/exec_env.h" +#include "runtime/datetime_value.h" +#include "util/stopwatch.hpp" #include "util/debug_util.h" #include "util/doris_metrics.h" -#include "util/stopwatch.hpp" #include "util/thrift_util.h" #include "util/uid_util.h" #include "util/url_coding.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "gen_cpp/HeartbeatService.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "gen_cpp/DataSinks_types.h" +#include "gen_cpp/Types_types.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/QueryPlanExtra_types.h" +#include <thrift/protocol/TDebugProtocol.h> +#include "util/threadpool.h" +#include "gutil/strings/substitute.h" namespace doris { @@ -87,6 +89,8 @@ public: Status execute(); + Status cancel_before_execute(); + Status cancel(const PPlanFragmentCancelReason& reason); TUniqueId fragment_instance_id() const { @@ -219,6 +223,13 @@ Status FragmentExecState::execute() { return Status::OK(); } +Status FragmentExecState::cancel_before_execute() { + // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close(). + _executor.set_abort(); + _executor.cancel(); + return Status::OK(); +} + Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) { std::lock_guard<std::mutex> l(_status_lock); RETURN_IF_ERROR(_exec_status); @@ -364,26 +375,31 @@ void FragmentExecState::coordinator_callback( } } -FragmentMgr::FragmentMgr(ExecEnv* exec_env) : - _exec_env(exec_env), - _fragment_map(), - _stop(false), - _cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)), - // TODO(zc): we need a better thread-pool - // now one user can use all the thread pool, others have no resource. - _thread_pool(config::fragment_pool_thread_num, config::fragment_pool_queue_size) { +FragmentMgr::FragmentMgr(ExecEnv* exec_env) + : _exec_env(exec_env), + _fragment_map(), + _stop(false), + _cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)) { REGISTER_GAUGE_DORIS_METRIC(plan_fragment_count, [this]() { std::lock_guard<std::mutex> lock(_lock); return _fragment_map.size(); }); + // TODO(zc): we need a better thread-pool + // now one user can use all the thread pool, others have no resource. + ThreadPoolBuilder("FragmentMgrThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(config::fragment_pool_thread_num_max) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_thread_pool); } FragmentMgr::~FragmentMgr() { // stop thread _stop = true; _cancel_thread.join(); - // Stop all the worker - _thread_pool.drain_and_shutdown(); + // Stop all the worker, should wait for a while? + // _thread_pool->wait_for(); + _thread_pool->shutdown(); // Only me can delete { @@ -421,13 +437,6 @@ Status FragmentMgr::exec_plan_fragment( return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1)); } -static void* fragment_executor(void* param) { - PriorityThreadPool::WorkFunction* func = (PriorityThreadPool::WorkFunction*)param; - (*func)(); - delete func; - return nullptr; -} - Status FragmentMgr::exec_plan_fragment( const TExecPlanFragmentParams& params, FinishCallback cb) { @@ -448,7 +457,7 @@ Status FragmentMgr::exec_plan_fragment( _exec_env, params.coord)); RETURN_IF_ERROR(exec_state->prepare(params)); - bool use_pool = true; + { std::lock_guard<std::mutex> lock(_lock); auto iter = _fragment_map.find(fragment_instance_id); @@ -458,38 +467,19 @@ Status FragmentMgr::exec_plan_fragment( } // register exec_state before starting exec thread _fragment_map.insert(std::make_pair(fragment_instance_id, exec_state)); - - // Now, we the fragement is - if (_fragment_map.size() >= config::fragment_pool_thread_num) { - use_pool = false; - } } - if (use_pool) { - if (!_thread_pool.offer( - boost::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb))) { - { - // Remove the exec state added - std::lock_guard<std::mutex> lock(_lock); - _fragment_map.erase(fragment_instance_id); - } - return Status::InternalError("Put planfragment to failed."); - } - } else { - pthread_t id; - int ret = pthread_create(&id, - nullptr, - fragment_executor, - new PriorityThreadPool::WorkFunction( - std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb))); - if (ret != 0) { - std::string err_msg("Could not create thread."); - err_msg.append(strerror(ret)); - err_msg.append(","); - err_msg.append(std::to_string(ret)); - return Status::InternalError(err_msg); + auto st = _thread_pool->submit_func( + std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb)); + if (!st.ok()) { + { + // Remove the exec state added + std::lock_guard<std::mutex> lock(_lock); + _fragment_map.erase(fragment_instance_id); } - pthread_detach(id); + exec_state->cancel_before_execute(); + return Status::InternalError(strings::Substitute( + "Put planfragment to thread pool failed. err = $0", st.get_error_msg())); } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 88ad8da..70c430c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -29,7 +29,6 @@ #include "gen_cpp/DorisExternalService_types.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" -#include "util/priority_thread_pool.hpp" #include "util/hash_util.hpp" #include "http/rest_monitor_iface.h" @@ -40,6 +39,7 @@ class FragmentExecState; class TExecPlanFragmentParams; class TUniqueId; class PlanFragmentExecutor; +class ThreadPool; std::string to_load_error_http_path(const std::string& file_name); @@ -90,7 +90,7 @@ private: bool _stop; std::thread _cancel_thread; // every job is a pool - PriorityThreadPool _thread_pool; + std::unique_ptr<ThreadPool> _thread_pool; }; } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index de076eb..d55dd19 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -510,6 +510,10 @@ void PlanFragmentExecutor::cancel() { _runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id()); } +void PlanFragmentExecutor::set_abort() { + update_status(Status::Aborted("Execution aborted before start")); +} + const RowDescriptor& PlanFragmentExecutor::row_desc() { return _plan->row_desc(); } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 2251ea4..bd4b0a2 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -114,6 +114,11 @@ public: // in open()/get_next(). void close(); + // Abort this execution. Must be called if we skip running open(). + // It will let DataSink node closed with error status, to avoid use resources which created in open() phase. + // DataSink node should distinguish Aborted status from other error status. + void set_abort(); + // Initiate cancellation. Must not be called until after prepare() returned. void cancel(); diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp index 683ee59..a8b0ce7 100644 --- a/be/test/runtime/fragment_mgr_test.cpp +++ b/be/test/runtime/fragment_mgr_test.cpp @@ -29,6 +29,7 @@ namespace doris { static Status s_prepare_status; static Status s_open_status; +static int s_abort_cnt; // Mock used for this unittest PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, const report_status_callback& report_status_cb) @@ -47,6 +48,11 @@ Status PlanFragmentExecutor::open() { void PlanFragmentExecutor::cancel() {} +void PlanFragmentExecutor::set_abort() { + LOG(INFO) << "Plan Aborted"; + s_abort_cnt++; +} + void PlanFragmentExecutor::close() {} class FragmentMgrTest : public testing::Test { @@ -57,9 +63,9 @@ protected: virtual void SetUp() { s_prepare_status = Status::OK(); s_open_status = Status::OK(); - LOG(INFO) << "fragment_pool_thread_num=" << config::fragment_pool_thread_num - << ", pool_size=" << config::fragment_pool_queue_size; - config::fragment_pool_thread_num = 32; + + config::fragment_pool_thread_num_min = 32; + config::fragment_pool_thread_num_max = 32; config::fragment_pool_queue_size = 1024; } virtual void TearDown() {} @@ -117,6 +123,30 @@ TEST_F(FragmentMgrTest, PrepareFailed) { ASSERT_FALSE(mgr.exec_plan_fragment(params).ok()); } +TEST_F(FragmentMgrTest, OfferPoolFailed) { + config::fragment_pool_thread_num_min = 1; + config::fragment_pool_thread_num_max = 1; + config::fragment_pool_queue_size = 0; + s_abort_cnt = 0; + FragmentMgr mgr(nullptr); + + TExecPlanFragmentParams params; + params.params.fragment_instance_id = TUniqueId(); + params.params.fragment_instance_id.__set_hi(100); + params.params.fragment_instance_id.__set_lo(200); + ASSERT_TRUE(mgr.exec_plan_fragment(params).ok()); + + // the first plan open will cost 50ms, so the next 3 plans will be aborted. + for (int i = 1; i < 4; ++i) { + TExecPlanFragmentParams params; + params.params.fragment_instance_id = TUniqueId(); + params.params.fragment_instance_id.__set_hi(100 + i); + params.params.fragment_instance_id.__set_lo(200); + ASSERT_FALSE(mgr.exec_plan_fragment(params).ok()); + } + ASSERT_EQ(3, s_abort_cnt); +} + } // namespace doris int main(int argc, char** argv) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org