This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a8ea3079d2 [fix](multicast) should not ignore Status of block::merge 
(#35886)
5a8ea3079d2 is described below

commit 5a8ea3079d2b0ee1a19b9d57fe46e6177353dde4
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Jun 5 14:22:02 2024 +0800

    [fix](multicast) should not ignore Status of block::merge (#35886)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 3 ++-
 be/src/pipeline/exec/multi_cast_data_streamer.cpp      | 5 +++--
 be/src/pipeline/exec/multi_cast_data_streamer.h        | 2 +-
 3 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index a7bb6462964..03d0d380fcb 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -87,7 +87,8 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
     if (!local_state._output_expr_contexts.empty()) {
         output_block = &tmp_block;
     }
-    local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, 
output_block, eos);
+    
RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id,
+                                                                              
output_block, eos));
 
     if (!local_state._conjuncts.empty()) {
         
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 24fa3217e3d..d3047c42a2d 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -29,7 +29,7 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int 
used_count, size_t
     block->clear();
 }
 
-void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
+Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
     std::lock_guard l(_mutex);
     auto& pos_to_pull = _sender_pos_to_read[sender_idx];
     if (pos_to_pull != _multi_cast_blocks.end()) {
@@ -43,7 +43,7 @@ void MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* block
         } else {
             pos_to_pull->_used_count--;
             pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
-            (void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
+            
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
             pos_to_pull++;
         }
     }
@@ -51,6 +51,7 @@ void MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* block
     if (pos_to_pull == _multi_cast_blocks.end()) {
         _block_reading(sender_idx);
     }
+    return Status::OK();
 }
 
 void MultiCastDataStreamer::close_sender(int sender_idx) {
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index e812067e52c..0a1276c4f1b 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -50,7 +50,7 @@ public:
 
     ~MultiCastDataStreamer() = default;
 
-    void pull(int sender_idx, vectorized::Block* block, bool* eos);
+    Status pull(int sender_idx, vectorized::Block* block, bool* eos);
 
     void close_sender(int sender_idx);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to