This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 57446b70f43 fix local exchanger low mem mode
57446b70f43 is described below

commit 57446b70f4328e9f195959da70d46f7db91612d5
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Wed Dec 11 10:24:35 2024 +0800

    fix local exchanger low mem mode
---
 be/src/pipeline/dependency.h                                  |  6 ++----
 .../pipeline/local_exchange/local_exchange_sink_operator.cpp  |  1 +
 be/src/pipeline/local_exchange/local_exchanger.cpp            | 10 ++++++++++
 be/src/pipeline/local_exchange/local_exchanger.h              | 11 ++++++-----
 4 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 7ce81a23dbd..0f1539cadf2 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -847,8 +847,7 @@ public:
     }
 
     virtual void set_low_memory_mode() {
-        _buffer_mem_limit =
-                std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 
* 1024 * 1024);
+        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
     }
 };
 
@@ -896,8 +895,7 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
     }
 
     void set_low_memory_mode() override {
-        _buffer_mem_limit =
-                std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 
* 1024 * 1024);
+        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
         DCHECK(!_queues_mem_usage.empty());
         _each_queue_limit =
                 std::max<int64_t>(64 * 1024, _buffer_mem_limit / 
_queues_mem_usage.size());
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index e01231aae7c..968812e594e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -147,6 +147,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
     if (state->get_query_ctx()->low_memory_mode()) {
         local_state._shared_state->set_low_memory_mode();
+        local_state._exchanger->set_low_memory_mode();
     }
 
     RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, 
local_state));
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index fccf3600c93..ae7e3e2ab7d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -352,6 +352,16 @@ void 
ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) {
         // do nothing
     }
 }
+
+void ExchangerBase::set_low_memory_mode() {
+    _free_block_limit = 0;
+
+    vectorized::Block block;
+    while (_free_blocks.try_dequeue(block)) {
+        // do nothing
+    }
+}
+
 void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& 
local_state) {
     BlockWrapperSPtr next_block;
     vectorized::Block block;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index f518e2649f8..c583348a3b0 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -72,6 +72,8 @@ public:
 
     virtual std::string data_queue_debug_string(int i) = 0;
 
+    void set_low_memory_mode();
+
 protected:
     friend struct LocalExchangeSharedState;
     friend struct BlockWrapper;
@@ -83,7 +85,7 @@ protected:
     const int _num_partitions;
     const int _num_senders;
     const int _num_sources;
-    const int _free_block_limit = 0;
+    int _free_block_limit = 0;
     moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
 };
 
@@ -181,10 +183,9 @@ struct BlockWrapper {
         if (ref_count.fetch_sub(1) == 1) {
             DCHECK_GT(allocated_bytes, 0);
             shared_state->sub_total_mem_usage(allocated_bytes, channel_id);
-            if (shared_state->exchanger->_free_block_limit == 0 ||
-                shared_state->exchanger->_free_blocks.size_approx() <
-                        shared_state->exchanger->_free_block_limit *
-                                shared_state->exchanger->_num_sources) {
+            if (shared_state->exchanger->_free_blocks.size_approx() <
+                shared_state->exchanger->_free_block_limit *
+                        shared_state->exchanger->_num_sources) {
                 data_block.clear_column_data();
                 
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to