This is an automated email from the ASF dual-hosted git repository.

luozenglin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 81799d614e [feature-wip](resource-group) support resource group 
interface in be. (#18588)
81799d614e is described below

commit 81799d614eedf5036171532f6b800825da1d905b
Author: luozenglin <luozeng...@gmail.com>
AuthorDate: Fri Apr 14 14:00:49 2023 +0800

    [feature-wip](resource-group) support resource group interface in be. 
(#18588)
---
 be/src/pipeline/task_queue.cpp                   | 16 +++++++
 be/src/pipeline/task_queue.h                     | 11 +++++
 be/src/pipeline/task_scheduler.cpp               |  8 ++++
 be/src/pipeline/task_scheduler.h                 |  3 ++
 be/src/runtime/fragment_mgr.cpp                  | 31 +++++++-----
 be/src/runtime/task_group/task_group.cpp         | 60 ++++++++++++++++++++++--
 be/src/runtime/task_group/task_group.h           | 36 ++++++++++++--
 be/src/runtime/task_group/task_group_manager.cpp | 35 +++++++-------
 be/src/runtime/task_group/task_group_manager.h   | 12 +----
 9 files changed, 160 insertions(+), 52 deletions(-)

diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 0716afa924..23a601d5c0 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -346,5 +346,21 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* 
task, int64_t time_spen
     }
 }
 
+void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                           taskgroup::TaskGroupPtr& 
task_group) {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    auto* entity = task_group->task_entity();
+    bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
+    if (is_in_queue) {
+        _group_entities.erase(entity);
+        _total_cpu_share -= entity->cpu_share();
+    }
+    task_group->check_and_update(task_group_info);
+    if (is_in_queue) {
+        _group_entities.emplace(entity);
+        _total_cpu_share += entity->cpu_share();
+    }
+}
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 6d225bc1a5..39b57815ad 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -44,6 +44,9 @@ public:
 
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
+    virtual void update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                   taskgroup::TaskGroupPtr& task_group) = 0;
+
     int cores() const { return _core_size; }
 
 protected:
@@ -126,6 +129,11 @@ public:
     // TODO pipeline update NormalWorkTaskQueue by time_spent.
     // void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
+    void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+                           taskgroup::TaskGroupPtr& task_group) override {
+        LOG(FATAL) << "update_task_group not implemented";
+    }
+
 private:
     PipelineTask* _steal_take(size_t core_id);
 
@@ -151,6 +159,9 @@ public:
 
     void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
+    void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+                           taskgroup::TaskGroupPtr& task_group) override;
+
 private:
     template <bool from_executor>
     Status _push_back(PipelineTask* task);
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e90fb5f536..70d1bd518f 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -334,4 +334,12 @@ void TaskScheduler::shutdown() {
     }
 }
 
+void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                          taskgroup::TaskGroupPtr& task_group) 
{
+    if (!task_group->check_version(task_group_info._version)) {
+        return;
+    }
+    _task_queue->update_task_group(task_group_info, task_group);
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 50284664f3..777923dbd3 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -72,6 +72,9 @@ public:
 
     void shutdown();
 
+    void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+                               taskgroup::TaskGroupPtr& task_group);
+
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index aefcf57158..07ef9f3c0a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -35,6 +35,7 @@
 #include "io/fs/stream_load_pipe.h"
 #include "opentelemetry/trace/scope.h"
 #include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/task_scheduler.h"
 #include "runtime/client_cache.h"
 #include "runtime/datetime_value.h"
 #include "runtime/descriptors.h"
@@ -654,19 +655,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             fragments_ctx->query_mem_tracker->enable_print_log_usage();
         }
 
-        if (pipeline) {
-            int ts = fragments_ctx->timeout_second;
-            taskgroup::TaskGroupPtr tg;
-            auto ts_id = taskgroup::TaskGroupManager::DEFAULT_TG_ID;
-            if (ts > 0 && ts <= config::pipeline_short_query_timeout_s) {
-                ts_id = taskgroup::TaskGroupManager::SHORT_TG_ID;
-            }
-            tg = 
taskgroup::TaskGroupManager::instance()->get_task_group(ts_id);
-            fragments_ctx->set_task_group(tg);
-            LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
-                      << "use task group: " << tg->debug_string();
-        }
-
         {
             // Find _fragments_ctx_map again, in case some other request has 
already
             // create the query fragments context.
@@ -779,6 +767,23 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, 
fragments_ctx));
 
+    if (params.__isset.resource_groups && !params.resource_groups.empty()) {
+        taskgroup::TaskGroupInfo task_group_info;
+        auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
+                                                                 
&task_group_info);
+        if (status.ok()) {
+            auto tg = 
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+                    task_group_info);
+            
_exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info,
 tg);
+            fragments_ctx->set_task_group(tg);
+            LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
+                      << " use task group: " << tg->debug_string();
+        }
+    } else {
+        VLOG_DEBUG << "Query/load id: " << print_id(fragments_ctx->query_id)
+                   << " does not use task group.";
+    }
+
     for (size_t i = 0; i < params.local_params.size(); i++) {
         const auto& local_params = params.local_params[i];
 
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 63f0a564fa..783470be45 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -17,6 +17,9 @@
 
 #include "task_group.h"
 
+#include <charconv>
+
+#include "gen_cpp/PaloInternalService_types.h"
 #include "pipeline/pipeline_task.h"
 
 namespace doris {
@@ -32,7 +35,7 @@ pipeline::PipelineTask* TaskGroupEntity::take() {
 }
 
 void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) {
-    auto v_time = runtime_ns / _tg->share();
+    auto v_time = runtime_ns / _tg->cpu_share();
     _vruntime_ns += v_time;
 }
 
@@ -46,7 +49,7 @@ void TaskGroupEntity::push_back(pipeline::PipelineTask* task) 
{
 }
 
 uint64_t TaskGroupEntity::cpu_share() const {
-    return _tg->share();
+    return _tg->cpu_share();
 }
 
 std::string TaskGroupEntity::debug_string() const {
@@ -54,11 +57,58 @@ std::string TaskGroupEntity::debug_string() const {
                        cpu_share(), _queue.size(), _vruntime_ns);
 }
 
-TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t share)
-        : _id(id), _name(name), _share(share), _task_entity(this) {}
+TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, 
int64_t version)
+        : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), 
_version(version) {}
 
 std::string TaskGroup::debug_string() const {
-    return fmt::format("TG[id = {}, name = {}, share = {}", _id, _name, 
share());
+    std::shared_lock<std::shared_mutex> rl {mutex};
+    return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", 
_id, _name,
+                       cpu_share(), _version);
+}
+
+bool TaskGroup::check_version(int64_t version) const {
+    std::shared_lock<std::shared_mutex> rl {mutex};
+    return version > _version;
+}
+
+void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
+    if (tg_info._id != _id) {
+        return;
+    }
+
+    std::lock_guard<std::shared_mutex> wl {mutex};
+    if (tg_info._version > _version) {
+        _name = tg_info._name;
+        _cpu_share = tg_info._cpu_share;
+        _version = tg_info._version;
+    }
+}
+
+Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup& 
resource_group,
+                                       TaskGroupInfo* task_group_info) {
+    if (!check_group_info(resource_group)) {
+        std::stringstream ss;
+        ss << "incomplete resource group parameters: ";
+        resource_group.printTo(ss);
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+
+    auto iter = resource_group.properties.find(CPU_SHARE);
+    uint64_t share = 0;
+    std::from_chars(iter->second.c_str(), iter->second.c_str() + 
iter->second.size(), share);
+
+    task_group_info->_id = resource_group.id;
+    task_group_info->_name = resource_group.name;
+    task_group_info->_version = resource_group.version;
+    task_group_info->_cpu_share = share;
+    return Status::OK();
+}
+
+bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup& 
resource_group) {
+    return resource_group.__isset.id && resource_group.__isset.version &&
+           resource_group.__isset.name && resource_group.__isset.properties &&
+           resource_group.properties.count(CPU_SHARE) > 0;
 }
 
 } // namespace taskgroup
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 62f06eb2dd..d6eed42a19 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -14,8 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 #pragma once
+
+#include <atomic>
 #include <queue>
+#include <shared_mutex>
 
 #include "olap/olap_define.h"
 
@@ -26,10 +30,14 @@ class PipelineTask;
 }
 
 class QueryFragmentsCtx;
+class TPipelineResourceGroup;
 
 namespace taskgroup {
 
 class TaskGroup;
+struct TaskGroupInfo;
+
+const static std::string CPU_SHARE = "cpu_share";
 
 class TaskGroupEntity {
 public:
@@ -60,23 +68,43 @@ using TGEntityPtr = TaskGroupEntity*;
 
 class TaskGroup {
 public:
-    TaskGroup(uint64_t id, std::string name, uint64_t cpu_share);
+    TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t 
version);
 
     TaskGroupEntity* task_entity() { return &_task_entity; }
 
-    uint64_t share() const { return _share; }
+    uint64_t cpu_share() const { return _cpu_share.load(); }
+
     uint64_t id() const { return _id; }
 
     std::string debug_string() const;
 
+    bool check_version(int64_t version) const;
+
+    void check_and_update(const TaskGroupInfo& tg_info);
+
 private:
-    uint64_t _id;
+    mutable std::shared_mutex mutex;
+    const uint64_t _id;
     std::string _name;
-    uint64_t _share;
+    std::atomic<uint64_t> _cpu_share;
     TaskGroupEntity _task_entity;
+    int64_t _version;
 };
 
 using TaskGroupPtr = std::shared_ptr<TaskGroup>;
 
+struct TaskGroupInfo {
+    uint64_t _id;
+    std::string _name;
+    uint64_t _cpu_share;
+    int64_t _version;
+
+    static Status parse_group_info(const TPipelineResourceGroup& 
resource_group,
+                                   TaskGroupInfo* task_group_info);
+
+private:
+    static bool check_group_info(const TPipelineResourceGroup& resource_group);
+};
+
 } // namespace taskgroup
 } // namespace doris
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index c359470579..cb7de92a6e 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -19,10 +19,7 @@
 
 namespace doris::taskgroup {
 
-TaskGroupManager::TaskGroupManager() {
-    _create_default_task_group();
-    _create_short_task_group();
-}
+TaskGroupManager::TaskGroupManager() = default;
 TaskGroupManager::~TaskGroupManager() = default;
 
 TaskGroupManager* TaskGroupManager::instance() {
@@ -30,23 +27,23 @@ TaskGroupManager* TaskGroupManager::instance() {
     return &tgm;
 }
 
-TaskGroupPtr TaskGroupManager::get_task_group(uint64_t id) {
-    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
-    if (_task_groups.count(id)) {
-        return _task_groups[id];
-    } else {
-        return _task_groups[DEFAULT_TG_ID];
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& 
task_group_info) {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        if (_task_groups.count(task_group_info._id)) {
+            return _task_groups[task_group_info._id];
+        }
     }
-}
 
-void TaskGroupManager::_create_default_task_group() {
-    _task_groups[DEFAULT_TG_ID] =
-            std::make_shared<TaskGroup>(DEFAULT_TG_ID, "default_tg", 
DEFAULT_TG_CPU_SHARE);
-}
-
-void TaskGroupManager::_create_short_task_group() {
-    _task_groups[SHORT_TG_ID] =
-            std::make_shared<TaskGroup>(SHORT_TG_ID, "short_tg", 
SHORT_TG_CPU_SHARE);
+    auto new_task_group =
+            std::make_shared<TaskGroup>(task_group_info._id, 
task_group_info._name,
+                                        task_group_info._cpu_share, 
task_group_info._version);
+    std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+    if (_task_groups.count(task_group_info._id)) {
+        return _task_groups[task_group_info._id];
+    }
+    _task_groups[task_group_info._id] = new_task_group;
+    return new_task_group;
 }
 
 } // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 4754e949fd..d5e32ac64e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -29,19 +29,9 @@ public:
     ~TaskGroupManager();
     static TaskGroupManager* instance();
 
-    // TODO pipeline task group
-    TaskGroupPtr get_task_group(uint64_t id);
-
-    static constexpr uint64_t DEFAULT_TG_ID = 0;
-    static constexpr uint64_t DEFAULT_TG_CPU_SHARE = 64;
-
-    static constexpr uint64_t SHORT_TG_ID = 1;
-    static constexpr uint64_t SHORT_TG_CPU_SHARE = 128;
+    TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& 
task_group_info);
 
 private:
-    void _create_default_task_group();
-    void _create_short_task_group();
-
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to