This is an automated email from the ASF dual-hosted git repository. luozenglin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 81799d614e [feature-wip](resource-group) support resource group interface in be. (#18588) 81799d614e is described below commit 81799d614eedf5036171532f6b800825da1d905b Author: luozenglin <luozeng...@gmail.com> AuthorDate: Fri Apr 14 14:00:49 2023 +0800 [feature-wip](resource-group) support resource group interface in be. (#18588) --- be/src/pipeline/task_queue.cpp | 16 +++++++ be/src/pipeline/task_queue.h | 11 +++++ be/src/pipeline/task_scheduler.cpp | 8 ++++ be/src/pipeline/task_scheduler.h | 3 ++ be/src/runtime/fragment_mgr.cpp | 31 +++++++----- be/src/runtime/task_group/task_group.cpp | 60 ++++++++++++++++++++++-- be/src/runtime/task_group/task_group.h | 36 ++++++++++++-- be/src/runtime/task_group/task_group_manager.cpp | 35 +++++++------- be/src/runtime/task_group/task_group_manager.h | 12 +---- 9 files changed, 160 insertions(+), 52 deletions(-) diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 0716afa924..23a601d5c0 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -346,5 +346,21 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen } } +void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group) { + std::unique_lock<std::mutex> lock(_rs_mutex); + auto* entity = task_group->task_entity(); + bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); + if (is_in_queue) { + _group_entities.erase(entity); + _total_cpu_share -= entity->cpu_share(); + } + task_group->check_and_update(task_group_info); + if (is_in_queue) { + _group_entities.emplace(entity); + _total_cpu_share += entity->cpu_share(); + } +} + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 6d225bc1a5..39b57815ad 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -44,6 +44,9 @@ public: virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} + virtual void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group) = 0; + int cores() const { return _core_size; } protected: @@ -126,6 +129,11 @@ public: // TODO pipeline update NormalWorkTaskQueue by time_spent. // void update_statistics(PipelineTask* task, int64_t time_spent) override; + void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group) override { + LOG(FATAL) << "update_task_group not implemented"; + } + private: PipelineTask* _steal_take(size_t core_id); @@ -151,6 +159,9 @@ public: void update_statistics(PipelineTask* task, int64_t time_spent) override; + void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group) override; + private: template <bool from_executor> Status _push_back(PipelineTask* task); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e90fb5f536..70d1bd518f 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -334,4 +334,12 @@ void TaskScheduler::shutdown() { } } +void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group) { + if (!task_group->check_version(task_group_info._version)) { + return; + } + _task_queue->update_task_group(task_group_info, task_group); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 50284664f3..777923dbd3 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -72,6 +72,9 @@ public: void shutdown(); + void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr& task_group); + private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index aefcf57158..07ef9f3c0a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -35,6 +35,7 @@ #include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" #include "pipeline/pipeline_fragment_context.h" +#include "pipeline/task_scheduler.h" #include "runtime/client_cache.h" #include "runtime/datetime_value.h" #include "runtime/descriptors.h" @@ -654,19 +655,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo fragments_ctx->query_mem_tracker->enable_print_log_usage(); } - if (pipeline) { - int ts = fragments_ctx->timeout_second; - taskgroup::TaskGroupPtr tg; - auto ts_id = taskgroup::TaskGroupManager::DEFAULT_TG_ID; - if (ts > 0 && ts <= config::pipeline_short_query_timeout_s) { - ts_id = taskgroup::TaskGroupManager::SHORT_TG_ID; - } - tg = taskgroup::TaskGroupManager::instance()->get_task_group(ts_id); - fragments_ctx->set_task_group(tg); - LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id) - << "use task group: " << tg->debug_string(); - } - { // Find _fragments_ctx_map again, in case some other request has already // create the query fragments context. @@ -779,6 +767,23 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<QueryFragmentsCtx> fragments_ctx; RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, fragments_ctx)); + if (params.__isset.resource_groups && !params.resource_groups.empty()) { + taskgroup::TaskGroupInfo task_group_info; + auto status = taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0], + &task_group_info); + if (status.ok()) { + auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group( + task_group_info); + _exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info, tg); + fragments_ctx->set_task_group(tg); + LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id) + << " use task group: " << tg->debug_string(); + } + } else { + VLOG_DEBUG << "Query/load id: " << print_id(fragments_ctx->query_id) + << " does not use task group."; + } + for (size_t i = 0; i < params.local_params.size(); i++) { const auto& local_params = params.local_params[i]; diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 63f0a564fa..783470be45 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -17,6 +17,9 @@ #include "task_group.h" +#include <charconv> + +#include "gen_cpp/PaloInternalService_types.h" #include "pipeline/pipeline_task.h" namespace doris { @@ -32,7 +35,7 @@ pipeline::PipelineTask* TaskGroupEntity::take() { } void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { - auto v_time = runtime_ns / _tg->share(); + auto v_time = runtime_ns / _tg->cpu_share(); _vruntime_ns += v_time; } @@ -46,7 +49,7 @@ void TaskGroupEntity::push_back(pipeline::PipelineTask* task) { } uint64_t TaskGroupEntity::cpu_share() const { - return _tg->share(); + return _tg->cpu_share(); } std::string TaskGroupEntity::debug_string() const { @@ -54,11 +57,58 @@ std::string TaskGroupEntity::debug_string() const { cpu_share(), _queue.size(), _vruntime_ns); } -TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t share) - : _id(id), _name(name), _share(share), _task_entity(this) {} +TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version) + : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), _version(version) {} std::string TaskGroup::debug_string() const { - return fmt::format("TG[id = {}, name = {}, share = {}", _id, _name, share()); + std::shared_lock<std::shared_mutex> rl {mutex}; + return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", _id, _name, + cpu_share(), _version); +} + +bool TaskGroup::check_version(int64_t version) const { + std::shared_lock<std::shared_mutex> rl {mutex}; + return version > _version; +} + +void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { + if (tg_info._id != _id) { + return; + } + + std::lock_guard<std::shared_mutex> wl {mutex}; + if (tg_info._version > _version) { + _name = tg_info._name; + _cpu_share = tg_info._cpu_share; + _version = tg_info._version; + } +} + +Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup& resource_group, + TaskGroupInfo* task_group_info) { + if (!check_group_info(resource_group)) { + std::stringstream ss; + ss << "incomplete resource group parameters: "; + resource_group.printTo(ss); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + auto iter = resource_group.properties.find(CPU_SHARE); + uint64_t share = 0; + std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share); + + task_group_info->_id = resource_group.id; + task_group_info->_name = resource_group.name; + task_group_info->_version = resource_group.version; + task_group_info->_cpu_share = share; + return Status::OK(); +} + +bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup& resource_group) { + return resource_group.__isset.id && resource_group.__isset.version && + resource_group.__isset.name && resource_group.__isset.properties && + resource_group.properties.count(CPU_SHARE) > 0; } } // namespace taskgroup diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 62f06eb2dd..d6eed42a19 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -14,8 +14,12 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #pragma once + +#include <atomic> #include <queue> +#include <shared_mutex> #include "olap/olap_define.h" @@ -26,10 +30,14 @@ class PipelineTask; } class QueryFragmentsCtx; +class TPipelineResourceGroup; namespace taskgroup { class TaskGroup; +struct TaskGroupInfo; + +const static std::string CPU_SHARE = "cpu_share"; class TaskGroupEntity { public: @@ -60,23 +68,43 @@ using TGEntityPtr = TaskGroupEntity*; class TaskGroup { public: - TaskGroup(uint64_t id, std::string name, uint64_t cpu_share); + TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version); TaskGroupEntity* task_entity() { return &_task_entity; } - uint64_t share() const { return _share; } + uint64_t cpu_share() const { return _cpu_share.load(); } + uint64_t id() const { return _id; } std::string debug_string() const; + bool check_version(int64_t version) const; + + void check_and_update(const TaskGroupInfo& tg_info); + private: - uint64_t _id; + mutable std::shared_mutex mutex; + const uint64_t _id; std::string _name; - uint64_t _share; + std::atomic<uint64_t> _cpu_share; TaskGroupEntity _task_entity; + int64_t _version; }; using TaskGroupPtr = std::shared_ptr<TaskGroup>; +struct TaskGroupInfo { + uint64_t _id; + std::string _name; + uint64_t _cpu_share; + int64_t _version; + + static Status parse_group_info(const TPipelineResourceGroup& resource_group, + TaskGroupInfo* task_group_info); + +private: + static bool check_group_info(const TPipelineResourceGroup& resource_group); +}; + } // namespace taskgroup } // namespace doris diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index c359470579..cb7de92a6e 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -19,10 +19,7 @@ namespace doris::taskgroup { -TaskGroupManager::TaskGroupManager() { - _create_default_task_group(); - _create_short_task_group(); -} +TaskGroupManager::TaskGroupManager() = default; TaskGroupManager::~TaskGroupManager() = default; TaskGroupManager* TaskGroupManager::instance() { @@ -30,23 +27,23 @@ TaskGroupManager* TaskGroupManager::instance() { return &tgm; } -TaskGroupPtr TaskGroupManager::get_task_group(uint64_t id) { - std::shared_lock<std::shared_mutex> r_lock(_group_mutex); - if (_task_groups.count(id)) { - return _task_groups[id]; - } else { - return _task_groups[DEFAULT_TG_ID]; +TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + if (_task_groups.count(task_group_info._id)) { + return _task_groups[task_group_info._id]; + } } -} -void TaskGroupManager::_create_default_task_group() { - _task_groups[DEFAULT_TG_ID] = - std::make_shared<TaskGroup>(DEFAULT_TG_ID, "default_tg", DEFAULT_TG_CPU_SHARE); -} - -void TaskGroupManager::_create_short_task_group() { - _task_groups[SHORT_TG_ID] = - std::make_shared<TaskGroup>(SHORT_TG_ID, "short_tg", SHORT_TG_CPU_SHARE); + auto new_task_group = + std::make_shared<TaskGroup>(task_group_info._id, task_group_info._name, + task_group_info._cpu_share, task_group_info._version); + std::lock_guard<std::shared_mutex> w_lock(_group_mutex); + if (_task_groups.count(task_group_info._id)) { + return _task_groups[task_group_info._id]; + } + _task_groups[task_group_info._id] = new_task_group; + return new_task_group; } } // namespace doris::taskgroup diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 4754e949fd..d5e32ac64e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -29,19 +29,9 @@ public: ~TaskGroupManager(); static TaskGroupManager* instance(); - // TODO pipeline task group - TaskGroupPtr get_task_group(uint64_t id); - - static constexpr uint64_t DEFAULT_TG_ID = 0; - static constexpr uint64_t DEFAULT_TG_CPU_SHARE = 64; - - static constexpr uint64_t SHORT_TG_ID = 1; - static constexpr uint64_t SHORT_TG_CPU_SHARE = 128; + TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); private: - void _create_default_task_group(); - void _create_short_task_group(); - std::shared_mutex _group_mutex; std::unordered_map<uint64_t, TaskGroupPtr> _task_groups; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org