This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 0e7e42dbdd4 [fix](spill) AssertNumRowsOperatorX fail (#46172)
0e7e42dbdd4 is described below

commit 0e7e42dbdd4226f727040d208e0d3602ca0c09d5
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Tue Dec 31 11:23:33 2024 +0800

    [fix](spill) AssertNumRowsOperatorX fail (#46172)
---
 be/src/pipeline/exec/assert_num_rows_operator.cpp | 13 +++++++++----
 be/src/pipeline/exec/multi_cast_data_streamer.cpp | 15 ++++++++-------
 2 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index b4e1f7a8419..dccc8c42a8a 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -111,10 +111,15 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                 return it->second;
             }
         };
-        LOG(INFO) << "Expected " << to_string_lambda(_assertion) << " " << 
_desired_num_rows
-                  << " to be returned by expression " << _subquery_string;
-        return Status::Cancelled("Expected {} {} to be returned by expression 
{}",
-                                 to_string_lambda(_assertion), 
_desired_num_rows, _subquery_string);
+        LOG(WARNING) << "Expected " << to_string_lambda(_assertion) << " " << 
_desired_num_rows
+                     << " to be returned by expression " << _subquery_string
+                     << ", actual returned: " << num_rows_returned << ", node 
id: " << _node_id
+                     << ", child id: " << _child->node_id();
+        return Status::Cancelled(
+                "AssertOperator(node id: {}) Expected {} {}(actual rows: {}) 
to be returned by "
+                "expression {}",
+                _node_id, to_string_lambda(_assertion), _desired_num_rows, 
num_rows_returned,
+                _subquery_string);
     }
     RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     return Status::OK();
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 9f0c653b1b7..733092bf458 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -51,13 +51,6 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, vectoriz
     MultiCastBlock* multi_cast_block = nullptr;
     {
         std::lock_guard l(_mutex);
-
-        if (!_cached_blocks[sender_idx].empty()) {
-            *block = std::move(_cached_blocks[sender_idx].front());
-            
_cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin());
-            return Status::OK();
-        }
-
         for (auto it = _spill_readers[sender_idx].begin();
              it != _spill_readers[sender_idx].end();) {
             if ((*it)->all_data_read) {
@@ -67,6 +60,14 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, vectoriz
             }
         }
 
+        if (!_cached_blocks[sender_idx].empty()) {
+            *block = std::move(_cached_blocks[sender_idx].front());
+            
_cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin());
+
+            *eos = _cached_blocks[sender_idx].empty() && 
_spill_readers[sender_idx].empty() && _eos;
+            return Status::OK();
+        }
+
         if (!_spill_readers[sender_idx].empty()) {
             auto reader_item = _spill_readers[sender_idx].front();
             if (!reader_item->stream->ready_for_reading()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to