This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 10c5c336d8e [branch-2.1](arrow-flight-sql) Add config arrow_flight_result_sink_buffer_size_rows (#38223) 10c5c336d8e is described below commit 10c5c336d8e9e9436bae5a65735173ed58fe16e1 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jul 24 15:15:39 2024 +0800 [branch-2.1](arrow-flight-sql) Add config arrow_flight_result_sink_buffer_size_rows (#38223) pick #38221 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 +++ be/src/exec/data_sink.cpp | 22 ++++++++++++++++------ be/src/pipeline/exec/result_sink_operator.cpp | 9 ++++++++- be/src/pipeline/exec/result_sink_operator.h | 1 + 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 863d69338bc..76ce00097b0 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -617,6 +617,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9050701261c..447473e4fdd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -675,6 +675,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent); // result buffer cancelled time (unit: second) DECLARE_mInt32(result_buffer_cancelled_interval_time); +// arrow flight result sink buffer rows size, default 4096 * 8 +DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 054021439c5..dc651080298 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -78,10 +78,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } + int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + if (!thrift_sink.result_sink.__isset.type || + thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } + // TODO: figure out good buffer size based on size of output row - sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, - vectorized::RESULT_SINK_BUFFER_SIZE)); + sink->reset(new doris::vectorized::VResultSink( + row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows)); break; } case TDataSinkType::RESULT_FILE_SINK: { @@ -233,10 +238,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } + int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + if (!thrift_sink.result_sink.__isset.type || + thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } + // TODO: figure out good buffer size based on size of output row - sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, - vectorized::RESULT_SINK_BUFFER_SIZE)); + sink->reset(new doris::vectorized::VResultSink( + row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows)); break; } case TDataSinkType::RESULT_FILE_SINK: { diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 624b9ca192d..1aa7f37c1fe 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -19,6 +19,7 @@ #include <memory> +#include "common/config.h" #include "common/object_pool.h" #include "exec/rowid_fetcher.h" #include "pipeline/exec/operator.h" @@ -64,8 +65,9 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); // create sender + auto& p = _parent->cast<ResultSinkOperatorX>(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, + state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state->execution_timeout())); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); return Status::OK(); @@ -118,6 +120,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r } else { _sink_type = sink.type; } + if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + _result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } else { + _result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + } _fetch_option = sink.fetch_option; _name = "ResultSink"; } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index aed9961a6d6..a3f8b8f9882 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -82,6 +82,7 @@ private: Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block); TResultSinkType::type _sink_type; + int _result_sink_buffer_size_rows; // set file options when sink type is FILE std::unique_ptr<vectorized::ResultFileOptions> _file_opts = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org