This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3119a0867a0 [opt](arrow-flight-sql) Add config `arrow_flight_result_sink_buffer_size_rows` (#38221) 3119a0867a0 is described below commit 3119a0867a0bc9ec00f816e14e24a6706a324472 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Jul 30 11:50:56 2024 +0800 [opt](arrow-flight-sql) Add config `arrow_flight_result_sink_buffer_size_rows` (#38221) support modifying arrow flight result sink buffer rows size with parameters, default 4096 * 8. we want to return a larger batch at a time, when large amounts of data. --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 +++ be/src/pipeline/exec/result_sink_operator.cpp | 15 +++++++++++---- be/src/pipeline/exec/result_sink_operator.h | 1 + 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f66a7dd17c5..f984621ec85 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -634,6 +634,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index fd38924f47e..fcfce74e7be 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -692,6 +692,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent); // result buffer cancelled time (unit: second) DECLARE_mInt32(result_buffer_cancelled_interval_time); +// arrow flight result sink buffer rows size, default 4096 * 8 +DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 4ca4d3d421c..73d0bea8f99 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -20,6 +20,7 @@ #include <memory> #include <utility> +#include "common/config.h" #include "common/object_pool.h" #include "exec/rowid_fetcher.h" #include "pipeline/exec/operator.h" @@ -48,9 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) if (state->query_options().enable_parallel_result_sink) { _sender = _parent->cast<ResultSinkOperatorX>()._sender; } else { + auto& p = _parent->cast<ResultSinkOperatorX>(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), - state->batch_size())); + fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, + state->execution_timeout(), state->batch_size())); } _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); return Status::OK(); @@ -107,6 +109,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r } else { _sink_type = sink.type; } + if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + _result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } else { + _result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE; + } _fetch_option = sink.fetch_option; _name = "ResultSink"; } @@ -126,8 +133,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) { if (state->query_options().enable_parallel_result_sink) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), - state->batch_size())); + state->query_id(), _result_sink_buffer_size_rows, &_sender, + state->execution_timeout(), state->batch_size())); } return Status::OK(); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 7ec7d43ec2b..06b961b2a31 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -152,6 +152,7 @@ private: Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block); TResultSinkType::type _sink_type; + int _result_sink_buffer_size_rows; // set file options when sink type is FILE std::unique_ptr<ResultFileOptions> _file_opts = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org