This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 9bb29ce164a [fix](DataQueue) Fix thread conflict issue caused by 
concurrent calls to DataQueue::remaining_has_data (#46094)
9bb29ce164a is described below

commit 9bb29ce164aa4895e231e51a91ab117025505d6a
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Fri Dec 27 15:28:04 2024 +0800

    [fix](DataQueue) Fix thread conflict issue caused by concurrent calls to 
DataQueue::remaining_has_data (#46094)
---
 be/src/pipeline/exec/cache_source_operator.cpp |  2 +-
 be/src/pipeline/exec/data_queue.cpp            | 11 -----------
 be/src/pipeline/exec/data_queue.h              |  7 ++++---
 be/src/pipeline/exec/union_source_operator.cpp |  2 +-
 4 files changed, 6 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
index cace8465fc2..b515aeb4957 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int 
indentation_level) const {
     if (_shared_state) {
         fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = 
{}, has_data = {})",
                        _shared_state->data_queue.is_all_finish(),
-                       _shared_state->data_queue.remaining_has_data());
+                       _shared_state->data_queue.has_more_data());
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 436a98e6b03..85354ece76a 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -72,17 +72,6 @@ void 
DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int ch
     _free_blocks[child_idx].emplace_back(std::move(block));
 }
 
-//use sink to check can_write
-bool DataQueue::has_enough_space_to_push() {
-    DCHECK(_cur_bytes_in_queue.size() == 1);
-    return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2;
-}
-
-//use source to check can_read
-bool DataQueue::has_data_or_finished(int child_idx) {
-    return remaining_has_data() || _is_finished[child_idx];
-}
-
 //check which queue have data, and save the idx in _flag_queue_idx,
 //so next loop, will check the record idx + 1 first
 //maybe it's useful with many queue, others maybe always 0
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index d8a22888d9b..f2a849ee7dd 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -52,10 +52,11 @@ public:
     bool is_finish(int child_idx = 0);
     bool is_all_finish();
 
-    bool has_enough_space_to_push();
-    bool has_data_or_finished(int child_idx = 0);
+    // This function is not thread safe, should be called in 
Operator::get_block()
     bool remaining_has_data();
 
+    bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
+
     int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
     int64_t max_size_of_queue() const { return _max_size_of_queue; }
 
@@ -102,7 +103,7 @@ private:
     //this only use to record the queue[0] for profile
     int64_t _max_bytes_in_queue = 0;
     int64_t _max_size_of_queue = 0;
-    static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
+    static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
 
     // data queue is multi sink one source
     std::shared_ptr<Dependency> _source_dependency = nullptr;
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 942135453b4..f43cd604b68 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int 
indentation_level) const {
     if (_shared_state) {
         fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = 
{}, has_data = {})",
                        _shared_state->data_queue.is_all_finish(),
-                       _shared_state->data_queue.remaining_has_data());
+                       _shared_state->data_queue.has_more_data());
     }
     return fmt::to_string(debug_string_buffer);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to