This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 99810f1ea5 [Bug](pipeline) fix hang on union_source_operator when child sink_operator all finished (#20938) 99810f1ea5 is described below commit 99810f1ea57e2072ced11c08938cf2dfe331ef94 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jun 19 09:46:38 2023 +0800 [Bug](pipeline) fix hang on union_source_operator when child sink_operator all finished (#20938) --- be/src/pipeline/exec/union_source_operator.cpp | 10 +++++++--- be/src/pipeline/exec/union_source_operator.h | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 6efd0bfc78..f39e0a582b 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -46,10 +46,14 @@ UnionSourceOperator::UnionSourceOperator(OperatorBuilderBase* operator_builder, _data_queue(queue), _need_read_for_const_expr(true) {}; +bool UnionSourceOperator::_has_data() { + return _need_read_for_const_expr || _data_queue->remaining_has_data(); +} + // we assumed it can read to process const expr, Although we don't know whether there is // ,and queue have data, could read also bool UnionSourceOperator::can_read() { - return _need_read_for_const_expr || _data_queue->remaining_has_data(); + return _has_data() || _data_queue->is_all_finish(); } Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block, bool* eos) { @@ -83,9 +87,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); //have exectue const expr, queue have no data any more, and child could be colsed - if (eos || (!can_read() && _data_queue->is_all_finish())) { + if (eos || (!_has_data() && _data_queue->is_all_finish())) { source_state = SourceState::FINISHED; - } else if (can_read()) { + } else if (_has_data()) { source_state = SourceState::MORE_DATA; } else { source_state = SourceState::DEPEND_ON_SOURCE; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 33a6bef587..8051bdd512 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -59,6 +59,8 @@ public: Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos); private: + bool _has_data(); + std::shared_ptr<DataQueue> _data_queue; bool _need_read_for_const_expr; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org