This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 0e7e42dbdd4 [fix](spill) AssertNumRowsOperatorX fail (#46172) 0e7e42dbdd4 is described below commit 0e7e42dbdd4226f727040d208e0d3602ca0c09d5 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Tue Dec 31 11:23:33 2024 +0800 [fix](spill) AssertNumRowsOperatorX fail (#46172) --- be/src/pipeline/exec/assert_num_rows_operator.cpp | 13 +++++++++---- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 15 ++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index b4e1f7a8419..dccc8c42a8a 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -111,10 +111,15 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return it->second; } }; - LOG(INFO) << "Expected " << to_string_lambda(_assertion) << " " << _desired_num_rows - << " to be returned by expression " << _subquery_string; - return Status::Cancelled("Expected {} {} to be returned by expression {}", - to_string_lambda(_assertion), _desired_num_rows, _subquery_string); + LOG(WARNING) << "Expected " << to_string_lambda(_assertion) << " " << _desired_num_rows + << " to be returned by expression " << _subquery_string + << ", actual returned: " << num_rows_returned << ", node id: " << _node_id + << ", child id: " << _child->node_id(); + return Status::Cancelled( + "AssertOperator(node id: {}) Expected {} {}(actual rows: {}) to be returned by " + "expression {}", + _node_id, to_string_lambda(_assertion), _desired_num_rows, num_rows_returned, + _subquery_string); } RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); return Status::OK(); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 9f0c653b1b7..733092bf458 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -51,13 +51,6 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectoriz MultiCastBlock* multi_cast_block = nullptr; { std::lock_guard l(_mutex); - - if (!_cached_blocks[sender_idx].empty()) { - *block = std::move(_cached_blocks[sender_idx].front()); - _cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin()); - return Status::OK(); - } - for (auto it = _spill_readers[sender_idx].begin(); it != _spill_readers[sender_idx].end();) { if ((*it)->all_data_read) { @@ -67,6 +60,14 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectoriz } } + if (!_cached_blocks[sender_idx].empty()) { + *block = std::move(_cached_blocks[sender_idx].front()); + _cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin()); + + *eos = _cached_blocks[sender_idx].empty() && _spill_readers[sender_idx].empty() && _eos; + return Status::OK(); + } + if (!_spill_readers[sender_idx].empty()) { auto reader_item = _spill_readers[sender_idx].front(); if (!reader_item->stream->ready_for_reading()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org