This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 9bb29ce164a [fix](DataQueue) Fix thread conflict issue caused by concurrent calls to DataQueue::remaining_has_data (#46094) 9bb29ce164a is described below commit 9bb29ce164aa4895e231e51a91ab117025505d6a Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Fri Dec 27 15:28:04 2024 +0800 [fix](DataQueue) Fix thread conflict issue caused by concurrent calls to DataQueue::remaining_has_data (#46094) --- be/src/pipeline/exec/cache_source_operator.cpp | 2 +- be/src/pipeline/exec/data_queue.cpp | 11 ----------- be/src/pipeline/exec/data_queue.h | 7 ++++--- be/src/pipeline/exec/union_source_operator.cpp | 2 +- 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index cace8465fc2..b515aeb4957 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int indentation_level) const { if (_shared_state) { fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", _shared_state->data_queue.is_all_finish(), - _shared_state->data_queue.remaining_has_data()); + _shared_state->data_queue.has_more_data()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 436a98e6b03..85354ece76a 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -72,17 +72,6 @@ void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int ch _free_blocks[child_idx].emplace_back(std::move(block)); } -//use sink to check can_write -bool DataQueue::has_enough_space_to_push() { - DCHECK(_cur_bytes_in_queue.size() == 1); - return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2; -} - -//use source to check can_read -bool DataQueue::has_data_or_finished(int child_idx) { - return remaining_has_data() || _is_finished[child_idx]; -} - //check which queue have data, and save the idx in _flag_queue_idx, //so next loop, will check the record idx + 1 first //maybe it's useful with many queue, others maybe always 0 diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index d8a22888d9b..f2a849ee7dd 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -52,10 +52,11 @@ public: bool is_finish(int child_idx = 0); bool is_all_finish(); - bool has_enough_space_to_push(); - bool has_data_or_finished(int child_idx = 0); + // This function is not thread safe, should be called in Operator::get_block() bool remaining_has_data(); + bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; } + int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; } int64_t max_size_of_queue() const { return _max_size_of_queue; } @@ -102,7 +103,7 @@ private: //this only use to record the queue[0] for profile int64_t _max_bytes_in_queue = 0; int64_t _max_size_of_queue = 0; - static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; + static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10; // data queue is multi sink one source std::shared_ptr<Dependency> _source_dependency = nullptr; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 942135453b4..f43cd604b68 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const { if (_shared_state) { fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", _shared_state->data_queue.is_all_finish(), - _shared_state->data_queue.remaining_has_data()); + _shared_state->data_queue.has_more_data()); } return fmt::to_string(debug_string_buffer); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org