This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 57446b70f43 fix local exchanger low mem mode 57446b70f43 is described below commit 57446b70f4328e9f195959da70d46f7db91612d5 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Wed Dec 11 10:24:35 2024 +0800 fix local exchanger low mem mode --- be/src/pipeline/dependency.h | 6 ++---- .../pipeline/local_exchange/local_exchange_sink_operator.cpp | 1 + be/src/pipeline/local_exchange/local_exchanger.cpp | 10 ++++++++++ be/src/pipeline/local_exchange/local_exchanger.h | 11 ++++++----- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 7ce81a23dbd..0f1539cadf2 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -847,8 +847,7 @@ public: } virtual void set_low_memory_mode() { - _buffer_mem_limit = - std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024); + _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024); } }; @@ -896,8 +895,7 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { } void set_low_memory_mode() override { - _buffer_mem_limit = - std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024); + _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024); DCHECK(!_queues_mem_usage.empty()); _each_queue_limit = std::max<int64_t>(64 * 1024, _buffer_mem_limit / _queues_mem_usage.size()); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index e01231aae7c..968812e594e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -147,6 +147,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (state->get_query_ctx()->low_memory_mode()) { local_state._shared_state->set_low_memory_mode(); + local_state._exchanger->set_low_memory_mode(); } RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index fccf3600c93..ae7e3e2ab7d 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -352,6 +352,16 @@ void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) { // do nothing } } + +void ExchangerBase::set_low_memory_mode() { + _free_block_limit = 0; + + vectorized::Block block; + while (_free_blocks.try_dequeue(block)) { + // do nothing + } +} + void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& local_state) { BlockWrapperSPtr next_block; vectorized::Block block; diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index f518e2649f8..c583348a3b0 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -72,6 +72,8 @@ public: virtual std::string data_queue_debug_string(int i) = 0; + void set_low_memory_mode(); + protected: friend struct LocalExchangeSharedState; friend struct BlockWrapper; @@ -83,7 +85,7 @@ protected: const int _num_partitions; const int _num_senders; const int _num_sources; - const int _free_block_limit = 0; + int _free_block_limit = 0; moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks; }; @@ -181,10 +183,9 @@ struct BlockWrapper { if (ref_count.fetch_sub(1) == 1) { DCHECK_GT(allocated_bytes, 0); shared_state->sub_total_mem_usage(allocated_bytes, channel_id); - if (shared_state->exchanger->_free_block_limit == 0 || - shared_state->exchanger->_free_blocks.size_approx() < - shared_state->exchanger->_free_block_limit * - shared_state->exchanger->_num_sources) { + if (shared_state->exchanger->_free_blocks.size_approx() < + shared_state->exchanger->_free_block_limit * + shared_state->exchanger->_num_sources) { data_block.clear_column_data(); shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org