This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 68e7cc57eff [fix](runtime) Avoid merging results into one large result in BufferControlBlock (#49602) 68e7cc57eff is described below commit 68e7cc57eff33c4682ef256f0994d347a8bb02d8 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Sun Mar 30 13:52:17 2025 +0800 [fix](runtime) Avoid merging results into one large result in BufferControlBlock (#49602) ### What problem does this PR solve? Avoid rpc error: `FLOW_CONTROL_ERROR` Problem Summary: Merging into an excessively large batch can cause BRPC transmission failure with error: `FLOW_CONTROL_ERROR` --- be/src/runtime/result_block_buffer.cpp | 12 +++++++++++- be/src/runtime/result_block_buffer.h | 4 ++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/result_block_buffer.cpp b/be/src/runtime/result_block_buffer.cpp index 9b26c0d4b28..47af430a17c 100644 --- a/be/src/runtime/result_block_buffer.cpp +++ b/be/src/runtime/result_block_buffer.cpp @@ -31,6 +31,7 @@ #include <vector> #include "arrow/type_fwd.h" +#include "common/config.h" #include "pipeline/dependency.h" #include "runtime/thread_context.h" #include "util/runtime_profile.h" @@ -189,10 +190,15 @@ Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, if (_waiting_rpc.empty()) { auto sz = 0; auto num_rows = 0; + size_t batch_size = 0; if constexpr (std::is_same_v<InBlockType, vectorized::Block>) { num_rows = result->rows(); + batch_size = result->bytes(); } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { num_rows = result->result_batch.rows.size(); + for (const auto& row : result->result_batch.rows) { + batch_size += row.size(); + } } if (!_result_batch_queue.empty()) { if constexpr (std::is_same_v<InBlockType, vectorized::Block>) { @@ -200,7 +206,8 @@ Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { sz = _result_batch_queue.back()->result_batch.rows.size(); } - if (sz + num_rows < _buffer_limit) { + if (sz + num_rows < _buffer_limit && + (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { if constexpr (std::is_same_v<InBlockType, vectorized::Block>) { auto last_block = _result_batch_queue.back(); for (size_t i = 0; i < last_block->columns(); i++) { @@ -214,15 +221,18 @@ Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), std::make_move_iterator(result_rows.end())); } + _last_batch_bytes += batch_size; } else { _instance_rows_in_queue.emplace_back(); _result_batch_queue.push_back(std::move(result)); + _last_batch_bytes = batch_size; _arrow_data_arrival .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,) } } else { _instance_rows_in_queue.emplace_back(); _result_batch_queue.push_back(std::move(result)); + _last_batch_bytes = batch_size; _arrow_data_arrival .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,) } diff --git a/be/src/runtime/result_block_buffer.h b/be/src/runtime/result_block_buffer.h index 0160fde3787..4cd738c2009 100644 --- a/be/src/runtime/result_block_buffer.h +++ b/be/src/runtime/result_block_buffer.h @@ -102,6 +102,10 @@ protected: // protects all subsequent data in this block std::mutex _lock; + // The last batch size in bytes. + // Determine whether to merge multiple batches based on the size of each batch to avoid getting an excessively large batch after merging. + size_t _last_batch_bytes = 0; + // get arrow flight result is a sync method, need wait for data ready and return result. // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. std::condition_variable _arrow_data_arrival; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org