This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a33e4639af [schedule](pipeline) Remove wait schedule time in pipeline 
query engine and change current queue to std::mutex (#24525)
6a33e4639af is described below

commit 6a33e4639afe687923ecb75e1c42a7f96f033772
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Sep 18 23:57:56 2023 +0800

    [schedule](pipeline) Remove wait schedule time in pipeline query engine and 
change current queue to std::mutex (#24525)
    
    This reverts commit 591aeaa98d1178e2e277278c7afeafef9bdb88d6.
---
 be/src/pipeline/pipeline_task.cpp          |  2 --
 be/src/pipeline/pipeline_task.h            |  4 ----
 be/src/pipeline/task_scheduler.cpp         | 31 ++++++++++---------------
 be/src/pipeline/task_scheduler.h           |  1 -
 be/src/vec/exec/scan/pip_scanner_context.h | 37 +++++++++++++++++++++---------
 5 files changed, 38 insertions(+), 37 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index bb27012b3d4..add92cabf37 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -86,7 +86,6 @@ void PipelineTask::_fresh_profile_counter() {
     COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
     COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
     COUNTER_SET(_wait_worker_timer, 
(int64_t)_wait_worker_watcher.elapsed_time());
-    COUNTER_SET(_wait_schedule_timer, 
(int64_t)_wait_schedule_watcher.elapsed_time());
     COUNTER_SET(_begin_execute_timer, _begin_execute_time);
     COUNTER_SET(_eos_timer, _eos_time);
     COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
@@ -117,7 +116,6 @@ void PipelineTask::_init_profile() {
     _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
     _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
-    _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
     _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
     _block_by_source_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySrcTimes", TUnit::UNIT);
     _block_by_sink_counts = ADD_COUNTER(_task_profile, 
"NumBlockedBySinkTimes", TUnit::UNIT);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4311c48f325..b8b8e89215f 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -133,8 +133,6 @@ public:
         _wait_worker_watcher.start();
     }
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
-    void start_schedule_watcher() { _wait_schedule_watcher.start(); }
-    void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
     PipelineTaskState get_state() { return _cur_state; }
     void set_state(PipelineTaskState state);
 
@@ -311,8 +309,6 @@ protected:
     MonotonicStopWatch _wait_worker_watcher;
     RuntimeProfile::Counter* _wait_worker_timer;
     // TODO we should calculate the time between when really runnable and 
runnable
-    MonotonicStopWatch _wait_schedule_watcher;
-    RuntimeProfile::Counter* _wait_schedule_timer;
     RuntimeProfile::Counter* _yield_counts;
     RuntimeProfile::Counter* _core_change_times;
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e4a4ec38af9..c4278c38077 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -85,7 +85,6 @@ void BlockedTaskScheduler::_schedule() {
     _started.store(true);
     std::list<PipelineTask*> local_blocked_tasks;
     int empty_times = 0;
-    std::vector<PipelineTask*> ready_tasks;
 
     while (!_shutdown) {
         {
@@ -105,6 +104,7 @@ void BlockedTaskScheduler::_schedule() {
             }
         }
 
+        auto origin_local_block_tasks_size = local_blocked_tasks.size();
         auto iter = local_blocked_tasks.begin();
         vectorized::VecDateTimeValue now = 
vectorized::VecDateTimeValue::local_time();
         while (iter != local_blocked_tasks.end()) {
@@ -116,57 +116,52 @@ void BlockedTaskScheduler::_schedule() {
                     VLOG_DEBUG << "Task pending" << task->debug_string();
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter, ready_tasks,
-                                   PipelineTaskState::PENDING_FINISH);
+                    _make_task_run(local_blocked_tasks, iter, 
PipelineTaskState::PENDING_FINISH);
                 }
             } else if (task->query_context()->is_cancelled()) {
-                _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                _make_task_run(local_blocked_tasks, iter);
             } else if (task->query_context()->is_timeout(now)) {
                 LOG(WARNING) << "Timeout, query_id=" << 
print_id(task->query_context()->query_id())
                              << ", instance_id=" << 
print_id(task->instance_id())
                              << ", task info: " << task->debug_string();
 
                 task->query_context()->cancel(true, "", Status::Cancelled(""));
-                _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                _make_task_run(local_blocked_tasks, iter);
             } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
                 if (task->has_dependency()) {
                     iter++;
                 } else {
-                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                    _make_task_run(local_blocked_tasks, iter);
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
                 if (task->source_can_read()) {
-                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                    _make_task_run(local_blocked_tasks, iter);
                 } else {
                     iter++;
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
                 if (task->runtime_filters_are_ready_or_timeout()) {
-                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                    _make_task_run(local_blocked_tasks, iter);
                 } else {
                     iter++;
                 }
             } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
                 if (task->sink_can_write()) {
-                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                    _make_task_run(local_blocked_tasks, iter);
                 } else {
                     iter++;
                 }
             } else {
                 // TODO: DCHECK the state
-                _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                _make_task_run(local_blocked_tasks, iter);
             }
         }
 
-        if (ready_tasks.empty()) {
+        if (origin_local_block_tasks_size == 0 ||
+            local_blocked_tasks.size() == origin_local_block_tasks_size) {
             empty_times += 1;
         } else {
             empty_times = 0;
-            for (auto& task : ready_tasks) {
-                task->stop_schedule_watcher();
-                _task_queue->push_back(task);
-            }
-            ready_tasks.clear();
         }
 
         if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 
0) {
@@ -186,13 +181,11 @@ void BlockedTaskScheduler::_schedule() {
 
 void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& 
local_tasks,
                                           std::list<PipelineTask*>::iterator& 
task_itr,
-                                          std::vector<PipelineTask*>& 
ready_tasks,
                                           PipelineTaskState t_state) {
     auto task = *task_itr;
-    task->start_schedule_watcher();
     task->set_state(t_state);
     local_tasks.erase(task_itr++);
-    ready_tasks.emplace_back(task);
+    _task_queue->push_back(task);
 }
 
 TaskScheduler::~TaskScheduler() {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index b9d3dfbac3c..13b9e734d69 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -71,7 +71,6 @@ private:
     void _schedule();
     void _make_task_run(std::list<PipelineTask*>& local_tasks,
                         std::list<PipelineTask*>::iterator& task_itr,
-                        std::vector<PipelineTask*>& ready_tasks,
                         PipelineTaskState state = PipelineTaskState::RUNNABLE);
 };
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 0c11314fb2d..09ec0e3a553 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -70,12 +70,17 @@ public:
         }
 
         {
-            if (!_blocks_queues[id].try_dequeue(*block)) {
+            std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+            if (_blocks_queues[id].empty()) {
                 *eos = _is_finished || _should_stop;
                 return Status::OK();
-            }
-            if (_blocks_queues[id].size_approx() == 0 && _data_dependency) {
-                _data_dependency->block_reading();
+            } else {
+                *block = std::move(_blocks_queues[id].front());
+                _blocks_queues[id].pop_front();
+
+                if (_blocks_queues[id].empty() && _data_dependency) {
+                    _data_dependency->block_reading();
+                }
             }
         }
         _current_used_bytes -= (*block)->allocated_bytes();
@@ -133,8 +138,9 @@ public:
             for (int i = 0; i < queue_size && i < block_size; ++i) {
                 int queue = _next_queue_to_feed;
                 {
+                    std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
                     for (int j = i; j < block_size; j += queue_size) {
-                        _blocks_queues[queue].enqueue(std::move(blocks[j]));
+                        
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
                     }
                 }
                 _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
@@ -146,11 +152,15 @@ public:
         _current_used_bytes += local_bytes;
     }
 
-    bool empty_in_queue(int id) override { return 
_blocks_queues[id].size_approx() == 0; }
+    bool empty_in_queue(int id) override {
+        std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+        return _blocks_queues[id].empty();
+    }
 
     Status init() override {
         for (int i = 0; i < _num_parallel_instances; ++i) {
-            
_blocks_queues.emplace_back(moodycamel::ConcurrentQueue<vectorized::BlockUPtr>());
+            _queue_mutexs.emplace_back(std::make_unique<std::mutex>());
+            _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
         }
         RETURN_IF_ERROR(ScannerContext::init());
         if (_need_colocate_distribute) {
@@ -182,9 +192,10 @@ public:
     void _dispose_coloate_blocks_not_in_queue() override {
         if (_need_colocate_distribute) {
             for (int i = 0; i < _num_parallel_instances; ++i) {
+                std::scoped_lock s(*_colocate_block_mutexs[i], 
*_queue_mutexs[i]);
                 if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
                     _current_used_bytes += 
_colocate_blocks[i]->allocated_bytes();
-                    _blocks_queues[i].enqueue(std::move(_colocate_blocks[i]));
+                    
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
                     _colocate_mutable_blocks[i]->clear();
                 }
                 if (_data_dependency) {
@@ -198,14 +209,15 @@ public:
         auto res = ScannerContext::debug_string();
         for (int i = 0; i < _blocks_queues.size(); ++i) {
             res += " queue " + std::to_string(i) + ":size " +
-                   std::to_string(_blocks_queues[i].size_approx());
+                   std::to_string(_blocks_queues[i].size());
         }
         return res;
     }
 
 private:
     int _next_queue_to_feed = 0;
-    std::vector<moodycamel::ConcurrentQueue<vectorized::BlockUPtr>> 
_blocks_queues;
+    std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
+    std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
     std::atomic_int64_t _current_used_bytes = 0;
 
     const std::vector<int>& _col_distribute_ids;
@@ -238,7 +250,10 @@ private:
 
             if (row_add == max_add) {
                 _current_used_bytes += 
_colocate_blocks[loc]->allocated_bytes();
-                _blocks_queues[loc].enqueue(std::move(_colocate_blocks[loc]));
+                {
+                    std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
+                    
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
+                }
                 if (_data_dependency) {
                     _data_dependency->set_ready_for_read();
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to