This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 22985af4d7 [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557) 22985af4d7 is described below commit 22985af4d702930d8d4b004d2fe7185832ed9960 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Jun 8 14:47:35 2023 +0800 [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data --- be/src/pipeline/exec/data_queue.cpp | 22 +++++++++++----------- be/src/pipeline/exec/data_queue.h | 10 +++++----- be/src/pipeline/exec/union_source_operator.cpp | 14 +++++++++----- be/src/pipeline/pipeline_fragment_context.cpp | 13 ------------- 4 files changed, 25 insertions(+), 34 deletions(-) diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 6dbf341972..6ec6a5b2b1 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -29,17 +29,17 @@ namespace doris { namespace pipeline { -DataQueue::DataQueue(int child_count) { - _child_count = child_count; - _flag_queue_idx = 0; - _queue_blocks.resize(child_count); - _free_blocks.resize(child_count); - _queue_blocks_lock.resize(child_count); - _free_blocks_lock.resize(child_count); - _is_finished.resize(child_count); - _is_canceled.resize(child_count); - _cur_bytes_in_queue.resize(child_count); - _cur_blocks_nums_in_queue.resize(child_count); +DataQueue::DataQueue(int child_count) + : _queue_blocks_lock(child_count), + _queue_blocks(child_count), + _free_blocks_lock(child_count), + _free_blocks(child_count), + _child_count(child_count), + _is_finished(child_count), + _is_canceled(child_count), + _cur_bytes_in_queue(child_count), + _cur_blocks_nums_in_queue(child_count), + _flag_queue_idx(0) { for (int i = 0; i < child_count; ++i) { _queue_blocks_lock[i].reset(new std::mutex()); _free_blocks_lock[i].reset(new std::mutex()); diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index 299f7e6b4d..e7f2fdfada 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -68,14 +68,14 @@ private: //how many deque will be init, always will be one int _child_count = 0; - std::deque<std::atomic<bool>> _is_finished; - std::deque<std::atomic<bool>> _is_canceled; + std::vector<std::atomic_bool> _is_finished; + std::vector<std::atomic_bool> _is_canceled; // int64_t just for counter of profile - std::deque<std::atomic<int64_t>> _cur_bytes_in_queue; - std::deque<std::atomic<uint32_t>> _cur_blocks_nums_in_queue; + std::vector<std::atomic_int64_t> _cur_bytes_in_queue; + std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue; //this will be indicate which queue has data, it's useful when have many queues - std::atomic<int> _flag_queue_idx = 0; + std::atomic_int _flag_queue_idx = 0; // only used by streaming agg source operator bool _data_exhausted = false; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 83069f3e90..7311d849c8 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -83,11 +83,15 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); //have exectue const expr, queue have no data any more, and child could be colsed - source_state = ((!_need_read_for_const_expr && !_data_queue->remaining_has_data() && - _data_queue->is_all_finish()) || - eos) - ? SourceState::FINISHED - : SourceState::DEPEND_ON_SOURCE; + if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() && + _data_queue->is_all_finish())) { + source_state = SourceState::FINISHED; + } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) { + source_state = SourceState::MORE_DATA; + } else { + source_state = SourceState::DEPEND_ON_SOURCE; + } + return Status::OK(); } } // namespace pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ebb30fe384..827fa43a4e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -107,19 +107,6 @@ #include "vec/exec/vunion_node.h" #include "vec/runtime/vdata_stream_mgr.h" -namespace apache { -namespace thrift { -class TException; - -namespace transport { -class TTransportException; -} // namespace transport -} // namespace thrift -} // namespace apache - -using apache::thrift::transport::TTransportException; -using apache::thrift::TException; - namespace doris::pipeline { PipelineFragmentContext::PipelineFragmentContext( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org