This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1038093c29 [Pipeline](Exec) disable work steal of hash join build (#15652) 1038093c29 is described below commit 1038093c29e94251546567aeebf6ccad430bccf3 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Jan 6 15:08:10 2023 +0800 [Pipeline](Exec) disable work steal of hash join build (#15652) --- be/src/pipeline/pipeline.h | 19 ++++++++++++++++--- be/src/pipeline/pipeline_fragment_context.cpp | 1 + be/src/pipeline/pipeline_task.h | 11 +++++++++-- be/src/pipeline/task_queue.cpp | 23 +++++++++++++---------- be/src/pipeline/task_queue.h | 6 +++--- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 054145c3d2..0b7b196d6a 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -37,7 +37,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> { public: Pipeline() = delete; explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context) - : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) { + : _complete_dependency(0), + _pipeline_id(pipeline_id), + _context(context), + _can_steal(true) { _init_profile(); } @@ -48,9 +51,13 @@ public: // If all dependencies are finished, this pipeline task should be scheduled. // e.g. Hash join probe task will be scheduled once Hash join build task is finished. - bool finish_one_dependency() { + bool finish_one_dependency(int dependency_core_id) { DCHECK(_complete_dependency < _dependencies.size()); - return _complete_dependency.fetch_add(1) == _dependencies.size() - 1; + bool finish = _complete_dependency.fetch_add(1) == _dependencies.size() - 1; + if (finish) { + _previous_schedule_id = dependency_core_id; + } + return finish; } bool has_dependency() { return _complete_dependency.load() < _dependencies.size(); } @@ -65,6 +72,10 @@ public: RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } + bool can_steal() const { return _can_steal; } + + void disable_task_steal() { _can_steal = false; } + private: void _init_profile(); std::atomic<uint32_t> _complete_dependency; @@ -77,6 +88,8 @@ private: PipelineId _pipeline_id; std::weak_ptr<PipelineFragmentContext> _context; + bool _can_steal; + int _previous_schedule_id = -1; std::unique_ptr<RuntimeProfile> _pipeline_profile; }; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index feff7b304c..bd57abd240 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -459,6 +459,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr join_sink = std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); + new_pipe->disable_task_steal(); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>( diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index be267b8efa..f6a6512201 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -99,6 +99,7 @@ public: _sink(sink), _prepared(false), _opened(false), + _can_steal(pipeline->_can_steal), _state(state), _cur_state(NOT_READY), _data_state(SourceState::DEPEND_ON_SOURCE), @@ -136,11 +137,13 @@ public: bool sink_can_write() { return _sink->can_write(); } + bool can_steal() const { return _can_steal; } + Status finalize(); void finish_p_dependency() { for (const auto& p : _pipeline->_parents) { - p->finish_one_dependency(); + p->finish_one_dependency(_previous_schedule_id); } } @@ -148,7 +151,10 @@ public: QueryFragmentsCtx* query_fragments_context(); - int get_previous_core_id() const { return _previous_schedule_id; } + int get_previous_core_id() const { + return _previous_schedule_id != -1 ? _previous_schedule_id + : _pipeline->_previous_schedule_id; + } void set_previous_core_id(int id) { _previous_schedule_id = id; } @@ -180,6 +186,7 @@ private: bool _prepared; bool _opened; + bool _can_steal; RuntimeState* _state; int _previous_schedule_id = -1; uint32_t _schedule_time = 0; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 93f569ed31..ff0b3c59ed 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -20,12 +20,15 @@ namespace doris { namespace pipeline { -PipelineTask* SubWorkTaskQueue::try_take() { +PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } - ++_schedule_time; auto task = _queue.front(); + if (!task->can_steal() && is_steal) { + return nullptr; + } + ++_schedule_time; _queue.pop(); return task; } @@ -52,7 +55,7 @@ void WorkTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* WorkTaskQueue::try_take_unprotected() { +PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -76,7 +79,7 @@ PipelineTask* WorkTaskQueue::try_take_unprotected() { } } - auto task = _sub_queues[idx].try_take(); + auto task = _sub_queues[idx].try_take(is_steal); if (task) { _total_task_size--; } @@ -93,15 +96,15 @@ int WorkTaskQueue::_compute_level(PipelineTask* task) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* WorkTaskQueue::try_take() { +PipelineTask* WorkTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock<std::mutex> lock(_work_size_mutex); - return try_take_unprotected(); + return try_take_unprotected(is_steal); } PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) { std::unique_lock<std::mutex> lock(_work_size_mutex); - auto task = try_take_unprotected(); + auto task = try_take_unprotected(false); if (task) { return task; } else { @@ -110,7 +113,7 @@ PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) { } else { _wait_task.wait(lock); } - return try_take_unprotected(); + return try_take_unprotected(false); } } @@ -138,7 +141,7 @@ void TaskQueue::close() { PipelineTask* TaskQueue::try_take(size_t core_id) { PipelineTask* task; while (!_closed) { - task = _async_queue[core_id].try_take(); + task = _async_queue[core_id].try_take(false); if (task) { break; } @@ -166,7 +169,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _async_queue[next_id].try_take(); + auto task = _async_queue[next_id].try_take(true); if (task) { return task; } diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 6432001fbc..54e48efe42 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -29,7 +29,7 @@ class SubWorkTaskQueue { public: void push_back(PipelineTask* task) { _queue.emplace(task); } - PipelineTask* try_take(); + PipelineTask* try_take(bool is_steal); void set_factor_for_normal(double factor_for_normal) { _factor_for_normal = factor_for_normal; } @@ -53,9 +53,9 @@ public: void close(); - PipelineTask* try_take_unprotected(); + PipelineTask* try_take_unprotected(bool is_steal); - PipelineTask* try_take(); + PipelineTask* try_take(bool is_steal); PipelineTask* take(uint32_t timeout_ms = 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org