This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 192d4eec6db752626893331d9e73db2bd8984d88 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Aug 6 10:09:25 2024 +0800 [scheduler](core) Use signed int as number of cores (#38514) *** is nereids: 0 *** *** tablet id: 0 *** *** Aborted at 1722279016 (unix time) try "date -d @1722279016" if you are using GNU date *** *** Current BE git commitID: e9f12fac47e *** *** SIGSEGV unknown detail explain (@0x0) received by PID 1116227 (TID 1116498 OR 0x7f009ac00640) from PID 0; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/common/signal_handler.h:421 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2# JVM_handle_linux_signal in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 3# 0x00007F01E49B0520 in /lib/x86_64-linux-gnu/libc.so.6 4# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:80 5# doris::pipeline::MultiCoreTaskQueue::take(unsigned long) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:154 6# doris::pipeline::TaskScheduler::_do_work(unsigned long) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:268 7# doris::ThreadPool::dispatch_thread() in /mnt/disk1/STRESS_ENV/be/lib/doris_be 8# doris::Thread::supervise_thread(void*) at /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:499 9# start_thread at ./nptl/pthread_create.c:442 10# 0x00007F01E4A94850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83 --- be/src/pipeline/task_queue.cpp | 54 ++++++++++++++++++++++++-------------- be/src/pipeline/task_queue.h | 23 ++++++++-------- be/src/pipeline/task_scheduler.cpp | 2 +- 3 files changed, 46 insertions(+), 33 deletions(-) diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 24d71144240..ea9fb09e260 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -131,37 +131,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) { return Status::OK(); } -int PriorityTaskQueue::task_size() { - std::unique_lock<std::mutex> lock(_work_size_mutex); - return _total_task_size; -} - MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list = std::make_unique<PriorityTaskQueue[]>(core_size); +MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) { + _prio_task_queue_list = + std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size); + for (int i = 0; i < core_size; i++) { + (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>(); + } } void MultiCoreTaskQueue::close() { + if (_closed) { + return; + } _closed = true; for (int i = 0; i < _core_size; ++i) { - _prio_task_queue_list[i].close(); + (*_prio_task_queue_list)[i]->close(); } + std::atomic_store(&_prio_task_queue_list, + std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr)); } -PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); while (!_closed) { - task = _prio_task_queue_list[core_id].try_take(false); + DCHECK(prio_task_queue_list->size() > core_id) + << " list size: " << prio_task_queue_list->size() << " core_id: " << core_id + << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); + task = (*prio_task_queue_list)[core_id]->try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id); + task = _steal_take(core_id, *prio_task_queue_list); if (task) { break; } - task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -173,16 +182,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::_steal_take( + int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) { DCHECK(core_id < _core_size); - size_t next_id = core_id; - for (size_t i = 1; i < _core_size; ++i) { + int next_id = core_id; + for (int i = 1; i < _core_size; ++i) { ++next_id; if (next_id == _core_size) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _prio_task_queue_list[next_id].try_take(true); + auto task = prio_task_queue_list[next_id]->try_take(true); if (task) { task->set_core_id(next_id); return task; @@ -199,16 +209,20 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { +Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return _prio_task_queue_list[core_id].push(task); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + return (*prio_task_queue_list)[core_id]->push(task); } void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { task->inc_runtime_ns(time_spent); - _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), - time_spent); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + (*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(), + time_spent); } } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 74ed9187567..e48deb51757 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -35,25 +35,25 @@ namespace doris::pipeline { class TaskQueue { public: - TaskQueue(size_t core_size) : _core_size(core_size) {} + TaskQueue(int core_size) : _core_size(core_size) {} virtual ~TaskQueue(); virtual void close() = 0; // Get the task by core id. // TODO: To think the logic is useful? - virtual PipelineTask* take(size_t core_id) = 0; + virtual PipelineTask* take(int core_id) = 0; // push from scheduler virtual Status push_back(PipelineTask* task) = 0; // push from worker - virtual Status push_back(PipelineTask* task, size_t core_id) = 0; + virtual Status push_back(PipelineTask* task, int core_id) = 0; virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} int cores() const { return _core_size; } protected: - size_t _core_size; + int _core_size; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; @@ -103,8 +103,6 @@ public: _sub_queues[level].inc_runtime(runtime); } - int task_size(); - private: PipelineTask* _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; @@ -128,27 +126,28 @@ private: // Need consider NUMA architecture class MultiCoreTaskQueue : public TaskQueue { public: - explicit MultiCoreTaskQueue(size_t core_size); + explicit MultiCoreTaskQueue(int core_size); ~MultiCoreTaskQueue() override; void close() override; // Get the task by core id. - PipelineTask* take(size_t core_id) override; + PipelineTask* take(int core_id) override; // TODO combine these methods to `push_back(task, core_id = -1)` Status push_back(PipelineTask* task) override; - Status push_back(PipelineTask* task, size_t core_id) override; + Status push_back(PipelineTask* task, int core_id) override; void update_statistics(PipelineTask* task, int64_t time_spent) override; private: - PipelineTask* _steal_take(size_t core_id); + PipelineTask* _steal_take( + int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list); - std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list; - std::atomic<size_t> _next_core = 0; + std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list; + std::atomic<int> _next_core = 0; std::atomic<bool> _closed; }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 7a860440228..8be30773ee1 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -52,13 +52,13 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); - // Must be mutil number of cpu cores RETURN_IF_ERROR(ThreadPoolBuilder(_name) .set_min_threads(cores) .set_max_threads(cores) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); + LOG_INFO("TaskScheduler set cores").tag("size", cores); _markers.resize(cores, true); for (size_t i = 0; i < cores; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org