This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4bf202db04292cd4c3972f73d167fd4ad7717188 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Mar 16 09:04:03 2024 +0800 [pipelineX](exchange) Make exchange buffer size configurable (#32201) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++-- be/src/pipeline/exec/exchange_sink_buffer.h | 1 - be/src/pipeline/exec/scan_operator.cpp | 1 + 5 files changed, 7 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 63ed49eca24..dc31d2c621e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -264,6 +264,7 @@ DEFINE_mInt32(doris_max_scan_key_num, "48"); DEFINE_mInt32(max_pushdown_conditions_per_column, "1024"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); +DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); DEFINE_mInt64(column_dictionary_key_ratio_threshold, "0"); DEFINE_mInt64(column_dictionary_key_size_threshold, "0"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a0a20450d5d..abb833f42e8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -306,6 +306,7 @@ DECLARE_mInt32(doris_max_scan_key_num); DECLARE_mInt32(max_pushdown_conditions_per_column); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); +DECLARE_mInt32(exchg_buffer_queue_capacity_factor); DECLARE_mInt64(column_dictionary_key_ratio_threshold); DECLARE_mInt64(column_dictionary_key_size_threshold); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ed7f18bfcb7..2b97551d8fb 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -111,7 +111,8 @@ void ExchangeSinkBuffer<Parent>::close() { template <typename Parent> bool ExchangeSinkBuffer<Parent>::can_write() const { - size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + size_t max_package_size = + config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); size_t total_package_size = 0; for (auto& [_, q] : _instance_to_package_queue) { total_package_size += q.size(); @@ -168,7 +169,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) { std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>(); _instance_to_broadcast_package_queue[low_id] = std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>>(); - _queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + _queue_capacity = + config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 0afa59bf731..8c0375499c3 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -270,7 +270,6 @@ private: int64_t get_sum_rpc_time(); std::atomic<int> _total_queue_size = 0; - static constexpr int QUEUE_CAPACITY_FACTOR = 64; std::shared_ptr<Dependency> _queue_dependency; std::shared_ptr<Dependency> _finish_dependency; std::atomic<bool> _should_stop {false}; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index c10e7777bdb..8870ba619cb 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1413,6 +1413,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) { if (_scanner_ctx) { _scanner_ctx->stop_scanners(state); } + std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org