This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 10c5c336d8e [branch-2.1](arrow-flight-sql) Add config 
arrow_flight_result_sink_buffer_size_rows (#38223)
10c5c336d8e is described below

commit 10c5c336d8e9e9436bae5a65735173ed58fe16e1
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Jul 24 15:15:39 2024 +0800

    [branch-2.1](arrow-flight-sql) Add config 
arrow_flight_result_sink_buffer_size_rows (#38223)
    
    pick #38221
---
 be/src/common/config.cpp                      |  2 ++
 be/src/common/config.h                        |  3 +++
 be/src/exec/data_sink.cpp                     | 22 ++++++++++++++++------
 be/src/pipeline/exec/result_sink_operator.cpp |  9 ++++++++-
 be/src/pipeline/exec/result_sink_operator.h   |  1 +
 5 files changed, 30 insertions(+), 7 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 863d69338bc..76ce00097b0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -617,6 +617,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
 // result buffer cancelled time (unit: second)
 DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
 
+DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9050701261c..447473e4fdd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -675,6 +675,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
 // result buffer cancelled time (unit: second)
 DECLARE_mInt32(result_buffer_cancelled_interval_time);
 
+// arrow flight result sink buffer rows size, default 4096 * 8
+DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
 
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 054021439c5..dc651080298 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -78,10 +78,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
 
+        int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+        if (!thrift_sink.result_sink.__isset.type ||
+            thrift_sink.result_sink.type == 
TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+            result_sink_buffer_size_rows = 
config::arrow_flight_result_sink_buffer_size_rows;
+        }
+
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                       thrift_sink.result_sink,
-                                                       
vectorized::RESULT_SINK_BUFFER_SIZE));
+        sink->reset(new doris::vectorized::VResultSink(
+                row_desc, output_exprs, thrift_sink.result_sink, 
result_sink_buffer_size_rows));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
@@ -233,10 +238,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
 
+        int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+        if (!thrift_sink.result_sink.__isset.type ||
+            thrift_sink.result_sink.type == 
TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+            result_sink_buffer_size_rows = 
config::arrow_flight_result_sink_buffer_size_rows;
+        }
+
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                       thrift_sink.result_sink,
-                                                       
vectorized::RESULT_SINK_BUFFER_SIZE));
+        sink->reset(new doris::vectorized::VResultSink(
+                row_desc, output_exprs, thrift_sink.result_sink, 
result_sink_buffer_size_rows));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 624b9ca192d..1aa7f37c1fe 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -19,6 +19,7 @@
 
 #include <memory>
 
+#include "common/config.h"
 #include "common/object_pool.h"
 #include "exec/rowid_fetcher.h"
 #include "pipeline/exec/operator.h"
@@ -64,8 +65,9 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
 
     // create sender
+    auto& p = _parent->cast<ResultSinkOperatorX>();
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-            state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
+            state->fragment_instance_id(), p._result_sink_buffer_size_rows, 
&_sender, true,
             state->execution_timeout()));
     
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
     return Status::OK();
@@ -118,6 +120,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, 
const RowDescriptor& r
     } else {
         _sink_type = sink.type;
     }
+    if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+        _result_sink_buffer_size_rows = 
config::arrow_flight_result_sink_buffer_size_rows;
+    } else {
+        _result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+    }
     _fetch_option = sink.fetch_option;
     _name = "ResultSink";
 }
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index aed9961a6d6..a3f8b8f9882 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -82,6 +82,7 @@ private:
 
     Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* 
final_block);
     TResultSinkType::type _sink_type;
+    int _result_sink_buffer_size_rows;
     // set file options when sink type is FILE
     std::unique_ptr<vectorized::ResultFileOptions> _file_opts = nullptr;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to