This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 192d4eec6db752626893331d9e73db2bd8984d88
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Aug 6 10:09:25 2024 +0800

    [scheduler](core) Use signed int as number of cores (#38514)
    
    *** is nereids: 0 ***
    *** tablet id: 0 ***
    *** Aborted at 1722279016 (unix time) try "date -d @1722279016" if you
    are using GNU date ***
    *** Current BE git commitID: e9f12fac47e ***
    *** SIGSEGV unknown detail explain (@0x0) received by PID 1116227 (TID
    1116498 OR 0x7f009ac00640) from PID 0; stack trace: ***
    0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
    siginfo_t*, void*) at
    
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/common/signal_handler.h:421
    1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0]
    in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
    2# JVM_handle_linux_signal in
    /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
     3# 0x00007F01E49B0520 in /lib/x86_64-linux-gnu/libc.so.6
     4# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:80
    5# doris::pipeline::MultiCoreTaskQueue::take(unsigned long) at
    
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:154
    6# doris::pipeline::TaskScheduler::_do_work(unsigned long) at
    
/home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:268
    7# doris::ThreadPool::dispatch_thread() in
    /mnt/disk1/STRESS_ENV/be/lib/doris_be
    8# doris::Thread::supervise_thread(void*) at
    /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:499
     9# start_thread at ./nptl/pthread_create.c:442
    10# 0x00007F01E4A94850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83
---
 be/src/pipeline/task_queue.cpp     | 54 ++++++++++++++++++++++++--------------
 be/src/pipeline/task_queue.h       | 23 ++++++++--------
 be/src/pipeline/task_scheduler.cpp |  2 +-
 3 files changed, 46 insertions(+), 33 deletions(-)

diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 24d71144240..ea9fb09e260 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -131,37 +131,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
     return Status::OK();
 }
 
-int PriorityTaskQueue::task_size() {
-    std::unique_lock<std::mutex> lock(_work_size_mutex);
-    return _total_task_size;
-}
-
 MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
 
-MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : 
TaskQueue(core_size), _closed(false) {
-    _prio_task_queue_list = std::make_unique<PriorityTaskQueue[]>(core_size);
+MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), 
_closed(false) {
+    _prio_task_queue_list =
+            
std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
+    for (int i = 0; i < core_size; i++) {
+        (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
+    }
 }
 
 void MultiCoreTaskQueue::close() {
+    if (_closed) {
+        return;
+    }
     _closed = true;
     for (int i = 0; i < _core_size; ++i) {
-        _prio_task_queue_list[i].close();
+        (*_prio_task_queue_list)[i]->close();
     }
+    std::atomic_store(&_prio_task_queue_list,
+                      
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr));
 }
 
-PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
+PipelineTask* MultiCoreTaskQueue::take(int core_id) {
     PipelineTask* task = nullptr;
+    auto prio_task_queue_list =
+            std::atomic_load_explicit(&_prio_task_queue_list, 
std::memory_order_relaxed);
     while (!_closed) {
-        task = _prio_task_queue_list[core_id].try_take(false);
+        DCHECK(prio_task_queue_list->size() > core_id)
+                << " list size: " << prio_task_queue_list->size() << " 
core_id: " << core_id
+                << " _core_size: " << _core_size << " _next_core: " << 
_next_core.load();
+        task = (*prio_task_queue_list)[core_id]->try_take(false);
         if (task) {
             task->set_core_id(core_id);
             break;
         }
-        task = _steal_take(core_id);
+        task = _steal_take(core_id, *prio_task_queue_list);
         if (task) {
             break;
         }
-        task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS 
/* timeout_ms */);
+        task = 
(*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms 
*/);
         if (task) {
             task->set_core_id(core_id);
             break;
@@ -173,16 +182,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
     return task;
 }
 
-PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) {
+PipelineTask* MultiCoreTaskQueue::_steal_take(
+        int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& 
prio_task_queue_list) {
     DCHECK(core_id < _core_size);
-    size_t next_id = core_id;
-    for (size_t i = 1; i < _core_size; ++i) {
+    int next_id = core_id;
+    for (int i = 1; i < _core_size; ++i) {
         ++next_id;
         if (next_id == _core_size) {
             next_id = 0;
         }
         DCHECK(next_id < _core_size);
-        auto task = _prio_task_queue_list[next_id].try_take(true);
+        auto task = prio_task_queue_list[next_id]->try_take(true);
         if (task) {
             task->set_core_id(next_id);
             return task;
@@ -199,16 +209,20 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
     return push_back(task, core_id);
 }
 
-Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
+Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
     DCHECK(core_id < _core_size);
     task->put_in_runnable_queue();
-    return _prio_task_queue_list[core_id].push(task);
+    auto prio_task_queue_list =
+            std::atomic_load_explicit(&_prio_task_queue_list, 
std::memory_order_relaxed);
+    return (*prio_task_queue_list)[core_id]->push(task);
 }
 
 void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t 
time_spent) {
     task->inc_runtime_ns(time_spent);
-    
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
-                                                                     
time_spent);
+    auto prio_task_queue_list =
+            std::atomic_load_explicit(&_prio_task_queue_list, 
std::memory_order_relaxed);
+    
(*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(),
+                                                                        
time_spent);
 }
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 74ed9187567..e48deb51757 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -35,25 +35,25 @@ namespace doris::pipeline {
 
 class TaskQueue {
 public:
-    TaskQueue(size_t core_size) : _core_size(core_size) {}
+    TaskQueue(int core_size) : _core_size(core_size) {}
     virtual ~TaskQueue();
     virtual void close() = 0;
     // Get the task by core id.
     // TODO: To think the logic is useful?
-    virtual PipelineTask* take(size_t core_id) = 0;
+    virtual PipelineTask* take(int core_id) = 0;
 
     // push from scheduler
     virtual Status push_back(PipelineTask* task) = 0;
 
     // push from worker
-    virtual Status push_back(PipelineTask* task, size_t core_id) = 0;
+    virtual Status push_back(PipelineTask* task, int core_id) = 0;
 
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
     int cores() const { return _core_size; }
 
 protected:
-    size_t _core_size;
+    int _core_size;
     static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
 };
 
@@ -103,8 +103,6 @@ public:
         _sub_queues[level].inc_runtime(runtime);
     }
 
-    int task_size();
-
 private:
     PipelineTask* _try_take_unprotected(bool is_steal);
     static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
@@ -128,27 +126,28 @@ private:
 // Need consider NUMA architecture
 class MultiCoreTaskQueue : public TaskQueue {
 public:
-    explicit MultiCoreTaskQueue(size_t core_size);
+    explicit MultiCoreTaskQueue(int core_size);
 
     ~MultiCoreTaskQueue() override;
 
     void close() override;
 
     // Get the task by core id.
-    PipelineTask* take(size_t core_id) override;
+    PipelineTask* take(int core_id) override;
 
     // TODO combine these methods to `push_back(task, core_id = -1)`
     Status push_back(PipelineTask* task) override;
 
-    Status push_back(PipelineTask* task, size_t core_id) override;
+    Status push_back(PipelineTask* task, int core_id) override;
 
     void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
 private:
-    PipelineTask* _steal_take(size_t core_id);
+    PipelineTask* _steal_take(
+            int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& 
prio_task_queue_list);
 
-    std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
-    std::atomic<size_t> _next_core = 0;
+    std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> 
_prio_task_queue_list;
+    std::atomic<int> _next_core = 0;
     std::atomic<bool> _closed;
 };
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 7a860440228..8be30773ee1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -52,13 +52,13 @@ TaskScheduler::~TaskScheduler() {
 
 Status TaskScheduler::start() {
     int cores = _task_queue->cores();
-    // Must be mutil number of cpu cores
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
                             .set_min_threads(cores)
                             .set_max_threads(cores)
                             .set_max_queue_size(0)
                             .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                             .build(&_fix_thread_pool));
+    LOG_INFO("TaskScheduler set cores").tag("size", cores);
     _markers.resize(cores, true);
     for (size_t i = 0; i < cores; ++i) {
         RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to