This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f755809a901 [fix](sink) The issue with 2GB limit of protocol buffer 
(#37990)
f755809a901 is described below

commit f755809a9019d8b3340026b00d926df967725b77
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Jul 22 13:31:45 2024 +0800

    [fix](sink) The issue with 2GB limit of protocol buffer (#37990)
    
    ## Proposed changes
    
    ```
    Fail to serialize doris.PFetchDataResult
    ```
    
    If the size of `PFetchDataResult` is greater than 2G, protocol buffer
    cannot serialize the message.
---
 be/src/runtime/buffer_control_block.cpp  |  8 +++++
 be/src/vec/sink/vmysql_result_writer.cpp | 58 ++++++++++++++++++++++++--------
 be/src/vec/sink/vmysql_result_writer.h   |  2 ++
 3 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 845afb9a84b..6f8022a0034 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -24,6 +24,7 @@
 #include <google/protobuf/stubs/callback.h>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <limits>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -80,6 +81,13 @@ void GetResultBatchCtx::on_data(const 
std::unique_ptr<TFetchDataResult>& t_resul
         result->set_packet_seq(packet_seq);
         result->set_eos(eos);
     }
+
+    /// The size limit of proto buffer message is 2G
+    if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
+        st = Status::InternalError("Message size exceeds 2GB: {}", 
result->ByteSizeLong());
+        result->clear_row_batch();
+        result->set_empty_batch(true);
+    }
     st.to_protobuf(result->mutable_status());
     { done->Run(); }
     delete this;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index d700d43165d..7fcc7fcf76f 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -30,6 +30,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "gutil/integral_types.h"
 #include "olap/hll.h"
 #include "runtime/buffer_control_block.h"
@@ -140,23 +141,11 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
 }
 
 template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& 
input_block) {
-    SCOPED_TIMER(_append_row_batch_timer);
+Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* 
state, Block& block) {
     Status status = Status::OK();
-    if (UNLIKELY(input_block.rows() == 0)) {
-        return status;
-    }
-
-    DCHECK(_output_vexpr_ctxs.empty() != true);
-
-    // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
-    // failed, just return the error status
-    Block block;
-    
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
-                                                                       
input_block, &block));
+    auto num_rows = block.rows();
     // convert one batch
     auto result = std::make_unique<TFetchDataResult>();
-    auto num_rows = block.rows();
     result->result_batch.rows.resize(num_rows);
     uint64_t bytes_sent = 0;
     {
@@ -249,6 +238,47 @@ Status 
VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
     return status;
 }
 
+template <bool is_binary_format>
+Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& 
input_block) {
+    SCOPED_TIMER(_append_row_batch_timer);
+    Status status = Status::OK();
+    if (UNLIKELY(input_block.rows() == 0)) {
+        return status;
+    }
+
+    DCHECK(_output_vexpr_ctxs.empty() != true);
+
+    // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+    // failed, just return the error status
+    Block block;
+    
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
+                                                                       
input_block, &block));
+    const auto total_bytes = block.bytes();
+
+    if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
+        const auto total_rows = block.rows();
+        const auto sub_block_count = (total_bytes + 
config::thrift_max_message_size - 1) /
+                                     config::thrift_max_message_size;
+        const auto sub_block_rows = (total_rows + sub_block_count - 1) / 
sub_block_count;
+
+        size_t offset = 0;
+        while (offset < total_rows) {
+            size_t rows = std::min(sub_block_rows, total_rows - offset);
+            auto sub_block = block.clone_empty();
+            for (size_t i = 0; i != block.columns(); ++i) {
+                sub_block.get_by_position(i).column =
+                        block.get_by_position(i).column->cut(offset, rows);
+            }
+            offset += rows;
+
+            RETURN_IF_ERROR(_write_one_block(state, sub_block));
+        }
+        return Status::OK();
+    }
+
+    return _write_one_block(state, block);
+}
+
 template <bool is_binary_format>
 Status VMysqlResultWriter<is_binary_format>::close(Status) {
     COUNTER_SET(_sent_rows_counter, _written_rows);
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index 1b165ecb748..b89b8cf1b90 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -66,6 +66,8 @@ private:
     int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const 
DataTypePtr& type,
                       MysqlRowBuffer<is_binary_format>& buffer, int scale = 
-1);
 
+    Status _write_one_block(RuntimeState* state, Block& block);
+
     BufferControlBlock* _sinker = nullptr;
 
     const VExprContextSPtrs& _output_vexpr_ctxs;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to