This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 68e7cc57eff [fix](runtime) Avoid merging results into one large result 
in BufferControlBlock (#49602)
68e7cc57eff is described below

commit 68e7cc57eff33c4682ef256f0994d347a8bb02d8
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Sun Mar 30 13:52:17 2025 +0800

    [fix](runtime) Avoid merging results into one large result in 
BufferControlBlock (#49602)
    
    ### What problem does this PR solve?
    
    Avoid rpc error: `FLOW_CONTROL_ERROR`
    
    Problem Summary:
    
    Merging into an excessively large batch can cause BRPC transmission
    failure with error: `FLOW_CONTROL_ERROR`
---
 be/src/runtime/result_block_buffer.cpp | 12 +++++++++++-
 be/src/runtime/result_block_buffer.h   |  4 ++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/result_block_buffer.cpp 
b/be/src/runtime/result_block_buffer.cpp
index 9b26c0d4b28..47af430a17c 100644
--- a/be/src/runtime/result_block_buffer.cpp
+++ b/be/src/runtime/result_block_buffer.cpp
@@ -31,6 +31,7 @@
 #include <vector>
 
 #include "arrow/type_fwd.h"
+#include "common/config.h"
 #include "pipeline/dependency.h"
 #include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
@@ -189,10 +190,15 @@ Status 
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
     if (_waiting_rpc.empty()) {
         auto sz = 0;
         auto num_rows = 0;
+        size_t batch_size = 0;
         if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
             num_rows = result->rows();
+            batch_size = result->bytes();
         } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
             num_rows = result->result_batch.rows.size();
+            for (const auto& row : result->result_batch.rows) {
+                batch_size += row.size();
+            }
         }
         if (!_result_batch_queue.empty()) {
             if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
@@ -200,7 +206,8 @@ Status 
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
             } else if constexpr (std::is_same_v<InBlockType, 
TFetchDataResult>) {
                 sz = _result_batch_queue.back()->result_batch.rows.size();
             }
-            if (sz + num_rows < _buffer_limit) {
+            if (sz + num_rows < _buffer_limit &&
+                (batch_size + _last_batch_bytes) <= 
config::thrift_max_message_size) {
                 if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
                     auto last_block = _result_batch_queue.back();
                     for (size_t i = 0; i < last_block->columns(); i++) {
@@ -214,15 +221,18 @@ Status 
ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
                     back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
                                      
std::make_move_iterator(result_rows.end()));
                 }
+                _last_batch_bytes += batch_size;
             } else {
                 _instance_rows_in_queue.emplace_back();
                 _result_batch_queue.push_back(std::move(result));
+                _last_batch_bytes = batch_size;
                 _arrow_data_arrival
                         .notify_one(); // Only valid for 
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
             }
         } else {
             _instance_rows_in_queue.emplace_back();
             _result_batch_queue.push_back(std::move(result));
+            _last_batch_bytes = batch_size;
             _arrow_data_arrival
                     .notify_one(); // Only valid for 
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
         }
diff --git a/be/src/runtime/result_block_buffer.h 
b/be/src/runtime/result_block_buffer.h
index 0160fde3787..4cd738c2009 100644
--- a/be/src/runtime/result_block_buffer.h
+++ b/be/src/runtime/result_block_buffer.h
@@ -102,6 +102,10 @@ protected:
     // protects all subsequent data in this block
     std::mutex _lock;
 
+    // The last batch size in bytes.
+    // Determine whether to merge multiple batches based on the size of each 
batch to avoid getting an excessively large batch after merging.
+    size_t _last_batch_bytes = 0;
+
     // get arrow flight result is a sync method, need wait for data ready and 
return result.
     // TODO, waiting for data will block pipeline, so use a request pool to 
save requests waiting for data.
     std::condition_variable _arrow_data_arrival;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to