levy5307 commented on code in PR #17615:
URL: https://github.com/apache/doris/pull/17615#discussion_r1133068802


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -106,7 +107,7 @@ bool PipelineTask::has_dependency() {
     return false;
 }
 
-Status PipelineTask::open() {
+Status PipelineTask::_open() {

Review Comment:
   Why did you change the function name?



##########
be/src/pipeline/pipeline_task.h:
##########
@@ -179,10 +181,14 @@ class PipelineTask {
 
     uint32_t total_schedule_time() const { return _schedule_time; }
 
+    taskgroup::TaskGroup* get_task_group();

Review Comment:
   const 



##########
be/src/pipeline/pipeline_fragment_context.h:
##########
@@ -90,6 +90,8 @@ class PipelineFragmentContext : public 
std::enable_shared_from_this<PipelineFrag
     void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
     std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
 
+    taskgroup::TaskGroup* get_task_group() { return 
_query_ctx->get_task_group(); }

Review Comment:
   ```taskgroup::TaskGroup* get_task_group() const```



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -680,6 +681,18 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
             fragments_ctx->query_mem_tracker->enable_print_log_usage();
         }
 
+        // TODO pipeline task group
+        if (params.query_options.enable_pipeline_engine) {
+            int ts = params.query_options.query_timeout;
+            taskgroup::TaskGroupPtr rg;
+            int ts_id = 0;
+            if (ts > 200) {

Review Comment:
   It's better to define a constant for `200`



##########
be/src/pipeline/task_queue.cpp:
##########
@@ -131,21 +132,151 @@ Status WorkTaskQueue::push(PipelineTask* task) {
 
 ////////////////// TaskQueue ////////////
 
-void TaskQueue::close() {
+
+////////////////// Resource Group ////////
+
+bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
+        const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& 
rhs_ptr) const {
+    int64_t lhs_val = lhs_ptr->vruntime_ns();
+    int64_t rhs_val = rhs_ptr->vruntime_ns();
+    return lhs_val < rhs_val;
+}
+
+TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : 
TaskQueue(core_size) {}
+
+TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
+
+void TaskGroupTaskQueue::close() {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    _closed = true;
+    _wait_task.notify_all();
+}
+
+Status TaskGroupTaskQueue::push_back(PipelineTask* task) {
+    return _push_back<false>(task);
+}
+
+Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {
+    return _push_back<true>(task);
+}
+
+template <bool from_executor>
+Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
+    auto* entry = task->get_task_group()->task_entity();
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    entry->push_back(task);
+    if (_groups.find(entry) == _groups.end()) {
+        _enqueue_task_group<from_executor>(entry);
+    }
+    _wait_task.notify_one();
+    return Status::OK();
+}
+
+// TODO pipeline support steal
+PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    taskgroup::TGEntityPtr entry = nullptr;
+    while (entry == nullptr) {
+        if (_closed) {
+            return nullptr;
+        }
+        if (_groups.empty()) {
+            _wait_task.wait(lock);
+        } else {
+            entry = _next_ts_entity();
+            if (!entry) {
+                _wait_task.wait_for(lock, 
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
+            }
+        }
+    }
+    DCHECK(entry->task_size() > 0);
+    if (entry->task_size() == 1) {
+        _dequeue_task_group(entry);
+    }
+    return entry->take();
+}
+
+template <bool from_worker>
+void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr ts_entity) 
{
+    _total_cpu_share += ts_entity->cpu_share();
+    if constexpr (!from_worker) {
+        auto* min_entity = _min_ts_entity.load();
+        if (!min_entity) {
+            int64_t new_vruntime_ns = min_entity->vruntime_ns() - 
_ideal_runtime_ns(ts_entity) / 2;
+            if (new_vruntime_ns > ts_entity->vruntime_ns()) {
+                ts_entity->adjust_vruntime_ns(new_vruntime_ns);
+            }
+        }
+    }
+    _groups.emplace(ts_entity);
+    _update_min_rg();
+}
+
+void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr ts_entity) 
{
+    _total_cpu_share -= ts_entity->cpu_share();
+    _groups.erase(ts_entity);
+    _update_min_rg();
+}
+
+void TaskGroupTaskQueue::_update_min_rg() {
+    auto* min_entity = _next_ts_entity();

Review Comment:
   `auto*` ==> `auto`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to