This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ba5c6fba985 [scheduler](core) Use signed int as number of cores (#38514) (#38913) ba5c6fba985 is described below commit ba5c6fba9856cdd1c3be54ab434e300f2b21d6b1 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Aug 6 14:44:59 2024 +0800 [scheduler](core) Use signed int as number of cores (#38514) (#38913) pick #38514 *** is nereids: 0 *** *** tablet id: 0 *** *** Aborted at 1722279016 (unix time) try "date -d @1722279016" if you are using GNU date *** *** Current BE git commitID: e9f12fac47e *** *** SIGSEGV unknown detail explain (@0x0) received by PID 1116227 (TID 1116498 OR 0x7f009ac00640) from PID 0; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/common/signal_handler.h:421 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2# JVM_handle_linux_signal in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 3# 0x00007F01E49B0520 in /lib/x86_64-linux-gnu/libc.so.6 4# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:80 5# doris::pipeline::MultiCoreTaskQueue::take(unsigned long) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:154 6# doris::pipeline::TaskScheduler::_do_work(unsigned long) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:268 7# doris::ThreadPool::dispatch_thread() in /mnt/disk1/STRESS_ENV/be/lib/doris_be 8# doris::Thread::supervise_thread(void*) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:499 9# start_thread at ./nptl/pthread_create.c:442 10# 0x00007F01E4A94850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83 ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/task_queue.cpp | 48 ++++++++++++++++++++++++-------------- be/src/pipeline/task_queue.h | 30 ++++++++++++------------ be/src/pipeline/task_scheduler.cpp | 2 +- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 617cd7a78d1..293769162f6 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -130,37 +130,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) { return Status::OK(); } -int PriorityTaskQueue::task_size() { - std::unique_lock<std::mutex> lock(_work_size_mutex); - return _total_task_size; -} - MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]); +MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) { + _prio_task_queue_list = + std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size); + for (int i = 0; i < core_size; i++) { + (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>(); + } } void MultiCoreTaskQueue::close() { + if (_closed) { + return; + } _closed = true; for (int i = 0; i < _core_size; ++i) { - _prio_task_queue_list[i].close(); + (*_prio_task_queue_list)[i]->close(); } + std::atomic_store(&_prio_task_queue_list, + std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr)); } -PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); while (!_closed) { - task = _prio_task_queue_list[core_id].try_take(false); + DCHECK(prio_task_queue_list->size() > core_id) + << " list size: " << prio_task_queue_list->size() << " core_id: " << core_id + << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); + task = (*prio_task_queue_list)[core_id]->try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id); + task = _steal_take(core_id, *prio_task_queue_list); if (task) { break; } - task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -172,16 +181,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::_steal_take( + int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) { DCHECK(core_id < _core_size); - size_t next_id = core_id; - for (size_t i = 1; i < _core_size; ++i) { + int next_id = core_id; + for (int i = 1; i < _core_size; ++i) { ++next_id; if (next_id == _core_size) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _prio_task_queue_list[next_id].try_take(true); + auto task = prio_task_queue_list[next_id]->try_take(true); if (task) { task->set_core_id(next_id); return task; @@ -198,10 +208,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { +Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return _prio_task_queue_list[core_id].push(task); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + return (*prio_task_queue_list)[core_id]->push(task); } } // namespace pipeline diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 02994511019..3ac9de46025 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -37,25 +37,25 @@ namespace pipeline { class TaskQueue { public: - TaskQueue(size_t core_size) : _core_size(core_size) {} + TaskQueue(int core_size) : _core_size(core_size) {} virtual ~TaskQueue(); virtual void close() = 0; // Get the task by core id. // TODO: To think the logic is useful? - virtual PipelineTask* take(size_t core_id) = 0; + virtual PipelineTask* take(int core_id) = 0; // push from scheduler virtual Status push_back(PipelineTask* task) = 0; // push from worker - virtual Status push_back(PipelineTask* task, size_t core_id) = 0; + virtual Status push_back(PipelineTask* task, int core_id) = 0; virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} int cores() const { return _core_size; } protected: - size_t _core_size; + int _core_size; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; @@ -105,8 +105,6 @@ public: _sub_queues[level].inc_runtime(runtime); } - int task_size(); - private: PipelineTask* _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; @@ -130,32 +128,34 @@ private: // Need consider NUMA architecture class MultiCoreTaskQueue : public TaskQueue { public: - explicit MultiCoreTaskQueue(size_t core_size); + explicit MultiCoreTaskQueue(int core_size); ~MultiCoreTaskQueue() override; void close() override; // Get the task by core id. - // TODO: To think the logic is useful? - PipelineTask* take(size_t core_id) override; + PipelineTask* take(int core_id) override; // TODO combine these methods to `push_back(task, core_id = -1)` Status push_back(PipelineTask* task) override; - Status push_back(PipelineTask* task, size_t core_id) override; + Status push_back(PipelineTask* task, int core_id) override; void update_statistics(PipelineTask* task, int64_t time_spent) override { task->inc_runtime_ns(time_spent); - _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), - time_spent); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + (*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(), + time_spent); } private: - PipelineTask* _steal_take(size_t core_id); + PipelineTask* _steal_take( + int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list); - std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list; - std::atomic<size_t> _next_core = 0; + std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list; + std::atomic<int> _next_core = 0; std::atomic<bool> _closed; }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index f2c86168180..de697469575 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -205,13 +205,13 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); - // Must be mutil number of cpu cores RETURN_IF_ERROR(ThreadPoolBuilder(_name) .set_min_threads(cores) .set_max_threads(cores) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); + LOG_INFO("TaskScheduler set cores").tag("size", cores); _markers.reserve(cores); for (size_t i = 0; i < cores; ++i) { _markers.push_back(std::make_unique<std::atomic<bool>>(true)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org