This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3119a0867a0 [opt](arrow-flight-sql) Add config 
`arrow_flight_result_sink_buffer_size_rows` (#38221)
3119a0867a0 is described below

commit 3119a0867a0bc9ec00f816e14e24a6706a324472
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Jul 30 11:50:56 2024 +0800

    [opt](arrow-flight-sql) Add config 
`arrow_flight_result_sink_buffer_size_rows` (#38221)
    
    support modifying arrow flight result sink buffer rows size with
    parameters, default 4096 * 8.
    
    we want to return a larger batch at a time, when large amounts of data.
---
 be/src/common/config.cpp                      |  2 ++
 be/src/common/config.h                        |  3 +++
 be/src/pipeline/exec/result_sink_operator.cpp | 15 +++++++++++----
 be/src/pipeline/exec/result_sink_operator.h   |  1 +
 4 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f66a7dd17c5..f984621ec85 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -634,6 +634,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
 // result buffer cancelled time (unit: second)
 DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
 
+DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fd38924f47e..fcfce74e7be 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -692,6 +692,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
 // result buffer cancelled time (unit: second)
 DECLARE_mInt32(result_buffer_cancelled_interval_time);
 
+// arrow flight result sink buffer rows size, default 4096 * 8
+DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+
 // the increased frequency of priority for remaining tasks in 
BlockingPriorityQueue
 DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
 
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 4ca4d3d421c..73d0bea8f99 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <utility>
 
+#include "common/config.h"
 #include "common/object_pool.h"
 #include "exec/rowid_fetcher.h"
 #include "pipeline/exec/operator.h"
@@ -48,9 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     if (state->query_options().enable_parallel_result_sink) {
         _sender = _parent->cast<ResultSinkOperatorX>()._sender;
     } else {
+        auto& p = _parent->cast<ResultSinkOperatorX>();
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                fragment_instance_id, p._result_sink_buffer_size_rows, 
&_sender,
+                state->execution_timeout(), state->batch_size()));
     }
     _sender->set_dependency(fragment_instance_id, 
_dependency->shared_from_this());
     return Status::OK();
@@ -107,6 +109,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, 
const RowDescriptor& r
     } else {
         _sink_type = sink.type;
     }
+    if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+        _result_sink_buffer_size_rows = 
config::arrow_flight_result_sink_buffer_size_rows;
+    } else {
+        _result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
+    }
     _fetch_option = sink.fetch_option;
     _name = "ResultSink";
 }
@@ -126,8 +133,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
 
     if (state->query_options().enable_parallel_result_sink) {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                state->query_id(), _result_sink_buffer_size_rows, &_sender,
+                state->execution_timeout(), state->batch_size()));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 7ec7d43ec2b..06b961b2a31 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -152,6 +152,7 @@ private:
 
     Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* 
final_block);
     TResultSinkType::type _sink_type;
+    int _result_sink_buffer_size_rows;
     // set file options when sink type is FILE
     std::unique_ptr<ResultFileOptions> _file_opts = nullptr;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to