levy5307 commented on code in PR #17615: URL: https://github.com/apache/doris/pull/17615#discussion_r1133068802
########## be/src/pipeline/pipeline_task.cpp: ########## @@ -106,7 +107,7 @@ bool PipelineTask::has_dependency() { return false; } -Status PipelineTask::open() { +Status PipelineTask::_open() { Review Comment: Why did you change the function name? ########## be/src/pipeline/pipeline_task.h: ########## @@ -179,10 +181,14 @@ class PipelineTask { uint32_t total_schedule_time() const { return _schedule_time; } + taskgroup::TaskGroup* get_task_group(); Review Comment: const ########## be/src/pipeline/pipeline_fragment_context.h: ########## @@ -90,6 +90,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; } std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; } + taskgroup::TaskGroup* get_task_group() { return _query_ctx->get_task_group(); } Review Comment: ```taskgroup::TaskGroup* get_task_group() const``` ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -680,6 +681,18 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->query_mem_tracker->enable_print_log_usage(); } + // TODO pipeline task group + if (params.query_options.enable_pipeline_engine) { + int ts = params.query_options.query_timeout; + taskgroup::TaskGroupPtr rg; + int ts_id = 0; + if (ts > 200) { Review Comment: It's better to define a constant for `200` ########## be/src/pipeline/task_queue.cpp: ########## @@ -131,21 +132,151 @@ Status WorkTaskQueue::push(PipelineTask* task) { ////////////////// TaskQueue //////////// -void TaskQueue::close() { + +////////////////// Resource Group //////// + +bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( + const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const { + int64_t lhs_val = lhs_ptr->vruntime_ns(); + int64_t rhs_val = rhs_ptr->vruntime_ns(); + return lhs_val < rhs_val; +} + +TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {} + +TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; + +void TaskGroupTaskQueue::close() { + std::unique_lock<std::mutex> lock(_rs_mutex); + _closed = true; + _wait_task.notify_all(); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task) { + return _push_back<false>(task); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) { + return _push_back<true>(task); +} + +template <bool from_executor> +Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { + auto* entry = task->get_task_group()->task_entity(); + std::unique_lock<std::mutex> lock(_rs_mutex); + entry->push_back(task); + if (_groups.find(entry) == _groups.end()) { + _enqueue_task_group<from_executor>(entry); + } + _wait_task.notify_one(); + return Status::OK(); +} + +// TODO pipeline support steal +PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { + std::unique_lock<std::mutex> lock(_rs_mutex); + taskgroup::TGEntityPtr entry = nullptr; + while (entry == nullptr) { + if (_closed) { + return nullptr; + } + if (_groups.empty()) { + _wait_task.wait(lock); + } else { + entry = _next_ts_entity(); + if (!entry) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + } + } + } + DCHECK(entry->task_size() > 0); + if (entry->task_size() == 1) { + _dequeue_task_group(entry); + } + return entry->take(); +} + +template <bool from_worker> +void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr ts_entity) { + _total_cpu_share += ts_entity->cpu_share(); + if constexpr (!from_worker) { + auto* min_entity = _min_ts_entity.load(); + if (!min_entity) { + int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(ts_entity) / 2; + if (new_vruntime_ns > ts_entity->vruntime_ns()) { + ts_entity->adjust_vruntime_ns(new_vruntime_ns); + } + } + } + _groups.emplace(ts_entity); + _update_min_rg(); +} + +void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr ts_entity) { + _total_cpu_share -= ts_entity->cpu_share(); + _groups.erase(ts_entity); + _update_min_rg(); +} + +void TaskGroupTaskQueue::_update_min_rg() { + auto* min_entity = _next_ts_entity(); Review Comment: `auto*` ==> `auto` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org