This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6068db30ef6 [pipelineX](fix) fix StreamingAggSource crash due to empty 
data block (#29769)
6068db30ef6 is described below

commit 6068db30ef677554ec2f097df1edc8fdf8ae67e6
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Jan 10 15:18:19 2024 +0800

    [pipelineX](fix) fix StreamingAggSource crash due to empty data block 
(#29769)
---
 be/src/pipeline/exec/streaming_aggregation_source_operator.cpp | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index e68be656fea..44991e2e14f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -83,16 +83,21 @@ Status StreamingAggSourceOperatorX::get_block(RuntimeState* 
state, vectorized::B
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (!local_state._shared_state->data_queue->data_exhausted()) {
-        std::unique_ptr<vectorized::Block> agg_block;
+        std::unique_ptr<vectorized::Block> agg_block = nullptr;
         DCHECK(local_state._dependency->is_blocked_by() == nullptr);
         
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
 
         if (local_state._shared_state->data_queue->data_exhausted()) {
             RETURN_IF_ERROR(Base::get_block(state, block, source_state));
-        } else {
+        } else if (agg_block) {
             block->swap(*agg_block);
             agg_block->clear_column_data(row_desc().num_materialized_slots());
             
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
+        } else if (local_state._shared_state->data_queue->is_all_finish()) {
+            source_state = SourceState::FINISHED;
+        } else {
+            return Status::InternalError("Something wrong in 
StreamingAggSource: {}",
+                                         Base::debug_string(0));
         }
     } else {
         RETURN_IF_ERROR(Base::get_block(state, block, source_state));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to