This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6068db30ef6 [pipelineX](fix) fix StreamingAggSource crash due to empty data block (#29769) 6068db30ef6 is described below commit 6068db30ef677554ec2f097df1edc8fdf8ae67e6 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jan 10 15:18:19 2024 +0800 [pipelineX](fix) fix StreamingAggSource crash due to empty data block (#29769) --- be/src/pipeline/exec/streaming_aggregation_source_operator.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index e68be656fea..44991e2e14f 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -83,16 +83,21 @@ Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::B auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); if (!local_state._shared_state->data_queue->data_exhausted()) { - std::unique_ptr<vectorized::Block> agg_block; + std::unique_ptr<vectorized::Block> agg_block = nullptr; DCHECK(local_state._dependency->is_blocked_by() == nullptr); RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block)); if (local_state._shared_state->data_queue->data_exhausted()) { RETURN_IF_ERROR(Base::get_block(state, block, source_state)); - } else { + } else if (agg_block) { block->swap(*agg_block); agg_block->clear_column_data(row_desc().num_materialized_slots()); local_state._shared_state->data_queue->push_free_block(std::move(agg_block)); + } else if (local_state._shared_state->data_queue->is_all_finish()) { + source_state = SourceState::FINISHED; + } else { + return Status::InternalError("Something wrong in StreamingAggSource: {}", + Base::debug_string(0)); } } else { RETURN_IF_ERROR(Base::get_block(state, block, source_state)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org