This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new e5fd336badf Use DorisVector in memtable; improve low memory mode 
(#46929)
e5fd336badf is described below

commit e5fd336badf5d689b24157d92cd9ca1b39f4d718
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Tue Jan 14 15:38:08 2025 +0800

    Use DorisVector in memtable; improve low memory mode (#46929)
---
 be/src/cloud/cloud_delta_writer.cpp                |  2 +-
 be/src/cloud/cloud_delta_writer.h                  |  2 +-
 be/src/cloud/cloud_tablets_channel.cpp             |  2 +-
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/olap/delta_writer.h                         |  4 +--
 be/src/olap/delta_writer_v2.cpp                    |  2 +-
 be/src/olap/delta_writer_v2.h                      |  2 +-
 be/src/olap/memtable.cpp                           | 12 ++++----
 be/src/olap/memtable.h                             | 10 +++---
 be/src/olap/memtable_writer.cpp                    |  2 +-
 be/src/olap/memtable_writer.h                      |  2 +-
 be/src/olap/rowset/segment_creator.cpp             |  4 ++-
 .../rowset/segment_v2/inverted_index_reader.cpp    |  9 +++---
 be/src/pipeline/dependency.h                       |  4 +--
 be/src/pipeline/exec/data_queue.cpp                | 14 +++++++--
 be/src/pipeline/exec/data_queue.h                  | 11 +++++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  7 ++---
 be/src/pipeline/exec/exchange_sink_operator.h      | 10 ++++++
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  5 +++
 be/src/pipeline/exec/operator.h                    |  2 ++
 .../exec/partitioned_hash_join_probe_operator.cpp  |  2 +-
 be/src/pipeline/exec/scan_operator.h               |  8 +++--
 .../exec/streaming_aggregation_operator.cpp        | 14 ++++-----
 .../pipeline/exec/streaming_aggregation_operator.h |  7 +++--
 be/src/pipeline/exec/union_sink_operator.cpp       |  2 +-
 be/src/pipeline/exec/union_sink_operator.h         |  5 +++
 be/src/pipeline/exec/union_source_operator.cpp     |  5 +--
 be/src/pipeline/exec/union_source_operator.h       |  7 +++++
 .../local_exchange_sink_operator.cpp               |  5 +--
 .../local_exchange/local_exchange_sink_operator.h  |  7 +++++
 be/src/pipeline/local_exchange/local_exchanger.cpp |  9 ------
 be/src/pipeline/local_exchange/local_exchanger.h   | 12 ++++++--
 be/src/pipeline/pipeline_task.cpp                  |  9 ++++--
 be/src/runtime/memory/memory_profile.cpp           | 12 ++------
 be/src/runtime/query_context.h                     |  8 +++++
 be/src/runtime/runtime_state.cpp                   |  1 -
 be/src/runtime/runtime_state.h                     |  4 ---
 be/src/runtime/tablets_channel.cpp                 |  6 ++--
 be/src/runtime/tablets_channel.h                   |  5 +--
 .../workload_group/workload_group_manager.cpp      | 36 ++++++++++++++++------
 be/src/vec/core/block.cpp                          | 10 ++++++
 be/src/vec/exec/scan/scanner_context.cpp           |  6 +---
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  2 +-
 be/src/vec/sink/vrow_distribution.h                |  6 ++--
 be/src/vec/sink/writer/async_result_writer.cpp     |  5 +++
 be/src/vec/sink/writer/async_result_writer.h       |  2 ++
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  2 +-
 47 files changed, 192 insertions(+), 113 deletions(-)

diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index 7beaeb3e086..a1c5e08f2c8 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -61,7 +61,7 @@ Status 
CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
 }
 
 Status CloudDeltaWriter::write(const vectorized::Block* block,
-                               const std::vector<uint32_t>& row_idxs) {
+                               const DorisVector<uint32_t>& row_idxs) {
     if (row_idxs.empty()) [[unlikely]] {
         return Status::OK();
     }
diff --git a/be/src/cloud/cloud_delta_writer.h 
b/be/src/cloud/cloud_delta_writer.h
index 4558b04acd1..e97442d137f 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -32,7 +32,7 @@ public:
                      const UniqueId& load_id);
     ~CloudDeltaWriter() override;
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs) override;
+    Status write(const vectorized::Block* block, const DorisVector<uint32_t>& 
row_idxs) override;
 
     Status close() override;
 
diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index 85b8e3ea33a..63e47b69d06 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -55,7 +55,7 @@ Status CloudTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& reques
         return Status::OK();
     }
 
-    std::unordered_map<int64_t, std::vector<uint32_t>> tablet_to_rowidxs;
+    std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
     _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
 
     std::unordered_set<int64_t> partition_ids;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 88277775f96..6718f877af9 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -118,7 +118,7 @@ Status BaseDeltaWriter::init() {
     return Status::OK();
 }
 
-Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) {
+Status DeltaWriter::write(const vectorized::Block* block, const 
DorisVector<uint32_t>& row_idxs) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 20f693d5e4d..550960f3f1e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -64,7 +64,7 @@ public:
 
     virtual ~BaseDeltaWriter();
 
-    virtual Status write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) = 0;
+    virtual Status write(const vectorized::Block* block, const 
DorisVector<uint32_t>& row_idxs) = 0;
 
     // flush the last memtable to flush queue, must call it before 
build_rowset()
     virtual Status close() = 0;
@@ -123,7 +123,7 @@ public:
 
     ~DeltaWriter() override;
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs) override;
+    Status write(const vectorized::Block* block, const DorisVector<uint32_t>& 
row_idxs) override;
 
     Status close() override;
 
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index a6fb0154489..e298e1aea39 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -139,7 +139,7 @@ Status DeltaWriterV2::init() {
     return Status::OK();
 }
 
-Status DeltaWriterV2::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) {
+Status DeltaWriterV2::write(const vectorized::Block* block, const 
DorisVector<uint32_t>& row_idxs) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index f9c2800a68f..b813ec228a0 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -71,7 +71,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
+    Status write(const vectorized::Block* block, const DorisVector<uint32_t>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 62d9ecf44a9..8593f1ef482 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -182,7 +182,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
 }
 
 Status MemTable::insert(const vectorized::Block* input_block,
-                        const std::vector<uint32_t>& row_idxs) {
+                        const DorisVector<uint32_t>& row_idxs) {
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
             _query_thread_context.query_mem_tracker->write_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
@@ -282,7 +282,7 @@ void 
MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
 }
 Status MemTable::_put_into_output(vectorized::Block& in_block) {
     SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
-    std::vector<uint32_t> row_pos_vec;
+    DorisVector<uint32_t> row_pos_vec;
     DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
     row_pos_vec.reserve(in_block.rows());
     for (int i = 0; i < _row_in_blocks.size(); i++) {
@@ -343,7 +343,7 @@ Status MemTable::_sort_by_cluster_keys() {
     auto clone_block = in_block.clone_without_columns();
     _output_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&clone_block);
 
-    std::vector<RowInBlock*> row_in_blocks;
+    DorisVector<RowInBlock*> row_in_blocks;
     std::unique_ptr<int, std::function<void(int*)>> 
row_in_blocks_deleter((int*)0x01, [&](int*) {
         std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
                       std::default_delete<RowInBlock>());
@@ -378,7 +378,7 @@ Status MemTable::_sort_by_cluster_keys() {
 
     in_block = mutable_block.to_block();
     SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
-    std::vector<uint32_t> row_pos_vec;
+    DorisVector<uint32_t> row_pos_vec;
     DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
     row_pos_vec.reserve(in_block.rows());
     for (int i = 0; i < row_in_blocks.size(); i++) {
@@ -392,7 +392,7 @@ Status MemTable::_sort_by_cluster_keys() {
                                           row_pos_vec.data() + 
in_block.rows(), &column_offset);
 }
 
-void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& 
tie,
+void MemTable::_sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& 
tie,
                                 std::function<int(const RowInBlock*, const 
RowInBlock*)> cmp) {
     auto iter = tie.iter();
     while (iter.next()) {
@@ -464,7 +464,7 @@ void MemTable::_aggregate() {
             vectorized::MutableBlock::build_mutable_block(&in_block);
     _vec_row_comparator->set_block(&mutable_block);
     auto& block_data = in_block.get_columns_with_type_and_name();
-    std::vector<RowInBlock*> temp_row_in_blocks;
+    DorisVector<RowInBlock*> temp_row_in_blocks;
     temp_row_in_blocks.reserve(_last_sorted_pos);
     RowInBlock* prev_row = nullptr;
     int row_pos = -1;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 09591df2745..f4b1de45272 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -183,7 +183,7 @@ public:
     size_t memory_usage() const { return _mem_tracker->consumption(); }
     size_t get_flush_reserve_memory_size() const;
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    Status insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
+    Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& 
row_idxs);
 
     void shrink_memtable_by_agg();
 
@@ -252,7 +252,7 @@ private:
     //return number of same keys
     size_t _sort();
     Status _sort_by_cluster_keys();
-    void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
+    void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
                           std::function<int(const RowInBlock*, const 
RowInBlock*)> cmp);
     template <bool is_final>
     void _finalize_one_row(RowInBlock* row, const 
vectorized::ColumnsWithTypeAndName& block_data,
@@ -263,10 +263,10 @@ private:
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
-    std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
-    std::vector<size_t> _offsets_of_aggregate_states;
+    DorisVector<vectorized::AggregateFunctionPtr> _agg_functions;
+    DorisVector<size_t> _offsets_of_aggregate_states;
     size_t _total_size_of_aggregate_states;
-    std::vector<RowInBlock*> _row_in_blocks;
+    DorisVector<RowInBlock*> _row_in_blocks;
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 88532646b66..6d63890eede 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -86,7 +86,7 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
 }
 
 Status MemTableWriter::write(const vectorized::Block* block,
-                             const std::vector<uint32_t>& row_idxs) {
+                             const DorisVector<uint32_t>& row_idxs) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 924c3735310..c8c72b54e3f 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -69,7 +69,7 @@ public:
                 std::shared_ptr<PartialUpdateInfo> partial_update_info,
                 std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = 
false);
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
+    Status write(const vectorized::Block* block, const DorisVector<uint32_t>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index e0eb7534123..1073e32395e 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -37,6 +37,7 @@
 #include "olap/rowset/segment_v2/vertical_segment_writer.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "util/pretty_printer.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_object.h"
@@ -254,7 +255,8 @@ Status SegmentFlusher::_flush_segment_writer(
     segstat.key_bounds = key_bounds;
     LOG(INFO) << "tablet_id:" << _context.tablet_id
               << ", flushing rowset_dir: " << _context.tablet_path
-              << ", rowset_id:" << _context.rowset_id << ", data size:" << 
segstat.data_size
+              << ", rowset_id:" << _context.rowset_id
+              << ", data size:" << 
PrettyPrinter::print_bytes(segstat.data_size)
               << ", index size:" << segstat.index_size;
 
     writer.reset();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 44c038aec9c..27dcb32dd99 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -1158,11 +1158,10 @@ Status InvertedIndexIterator::read_from_inverted_index(
             RETURN_IF_ERROR(
                     try_read_from_inverted_index(column_name, query_value, 
query_type, &hit_count));
             if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) {
-                return Status::
-                        Error<ErrorCode::INVERTED_INDEX_BYPASS>(
-                                "hit count: {}, bkd inverted reached limit {}% 
, segment num "
-                                "rows:{}", // add blackspace after % to avoid 
log4j format bug
-                                hit_count, query_bkd_limit_percent, 
segment_num_rows);
+                return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
+                        "hit count: {}, bkd inverted reached limit {}% , 
segment num "
+                        "rows:{}", // add blackspace after % to avoid log4j 
format bug
+                        hit_count, query_bkd_limit_percent, segment_num_rows);
             }
         }
     }
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 9ce85bcb7b5..e010ad7e6cd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -779,7 +779,7 @@ public:
     std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<RuntimeProfile::Counter*> mem_counters;
     std::atomic<int64_t> mem_usage = 0;
-    size_t _buffer_mem_limit = config::local_exchange_buffer_mem_limit;
+    std::atomic<size_t> _buffer_mem_limit = 
config::local_exchange_buffer_mem_limit;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
     virtual void create_dependencies(int local_exchange_id) {
@@ -906,7 +906,7 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
 
 private:
     std::vector<std::atomic_int64_t> _queues_mem_usage;
-    int64_t _each_queue_limit;
+    std::atomic_int64_t _each_queue_limit;
 };
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 85354ece76a..97a3293004f 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -68,8 +68,18 @@ std::unique_ptr<vectorized::Block> 
DataQueue::get_free_block(int child_idx) {
 
 void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int 
child_idx) {
     DCHECK(block->rows() == 0);
-    std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
-    _free_blocks[child_idx].emplace_back(std::move(block));
+    if (!_is_low_memory_mode) {
+        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
+        _free_blocks[child_idx].emplace_back(std::move(block));
+    }
+}
+
+void DataQueue::clear_free_blocks() {
+    for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
+        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
+        std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
+        _free_blocks[child_idx].swap(tmp_queue);
+    }
 }
 
 //check which queue have data, and save the idx in _flag_queue_idx,
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index aabe0ba8797..1548c34addf 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -48,6 +48,8 @@ public:
 
     void push_free_block(std::unique_ptr<vectorized::Block> output_block, int 
child_idx = 0);
 
+    void clear_free_blocks();
+
     void set_finish(int child_idx = 0);
     void set_canceled(int child_idx = 0); // should set before finish
     bool is_finish(int child_idx = 0);
@@ -74,7 +76,11 @@ public:
 
     void set_max_blocks_in_sub_queue(int64_t max_blocks) { 
_max_blocks_in_sub_queue = max_blocks; }
 
-    void set_low_memory_mode() { _max_blocks_in_sub_queue = 1; }
+    void set_low_memory_mode() {
+        _is_low_memory_mode = true;
+        _max_blocks_in_sub_queue = 1;
+        clear_free_blocks();
+    }
 
 private:
     std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
@@ -99,7 +105,8 @@ private:
     // only used by streaming agg source operator
     bool _data_exhausted = false;
 
-    int64_t _max_blocks_in_sub_queue = 1;
+    std::atomic_bool _is_low_memory_mode = false;
+    std::atomic_int64_t _max_blocks_in_sub_queue = 1;
 
     //this only use to record the queue[0] for profile
     int64_t _max_bytes_in_queue = 0;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 3ac73813021..573f4aa840e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -356,8 +356,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     }
 
     // When `local_state.only_local_exchange` the `sink_buffer` is nullptr.
-    if (state->get_query_ctx()->low_memory_mode() && local_state._sink_buffer 
!= nullptr) {
-        local_state._sink_buffer->set_low_memory_mode();
+    if (state->get_query_ctx()->low_memory_mode()) {
+        set_low_memory_mode(state);
     }
 
     if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
@@ -398,9 +398,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                         block_holder->reset_block();
                     }
 
-                    if (state->get_query_ctx()->low_memory_mode()) {
-                        
local_state._broadcast_pb_mem_limiter->set_low_memory_mode();
-                    }
                     
local_state._broadcast_pb_mem_limiter->acquire(*block_holder);
 
                     size_t idx = 0;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 3d6eeb4b39e..d7030ae242a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -199,6 +199,16 @@ public:
 
     DataDistribution required_data_distribution() const override;
     bool is_serial_operator() const override { return true; }
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        // When `local_state.only_local_exchange` the `sink_buffer` is nullptr.
+        if (local_state._sink_buffer) {
+            local_state._sink_buffer->set_low_memory_mode();
+        }
+        if (local_state._broadcast_pb_mem_limiter) {
+            local_state._broadcast_pb_mem_limiter->set_low_memory_mode();
+        }
+    }
 
     // For a normal shuffle scenario, if the concurrency is n,
     // there can be up to n * n RPCs in the current fragment.
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 4b55ec09efa..da5986d09ee 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -66,6 +66,11 @@ public:
         return local_state.sink(state, in_block, eos);
     }
 
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        local_state._writer->clear_free_blocks();
+    }
+
 private:
     friend class OlapTableSinkV2LocalState;
     template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 84327f8b5a6..7d1236b85b1 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -118,6 +118,8 @@ public:
         return Status::OK();
     }
 
+    virtual void set_low_memory_mode(RuntimeState* state) {}
+
     [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
     OperatorPtr child() { return _child; }
     [[nodiscard]] bool followed_by_shuffled_operator() const {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 0588afcca0a..120c6bcbd06 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -870,7 +870,7 @@ bool 
PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
                                      static_cast<int64_t>(
                                              
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM));
         } else {
-            return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+            return revocable_size > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
         }
     }
     return false;
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index d099ccdd12f..d6cd2806db5 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include <cstdint>
 #include <string>
 
@@ -29,6 +27,7 @@
 #include "pipeline/dependency.h"
 #include "runtime/descriptors.h"
 #include "runtime/types.h"
+#include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vectorized_fn_call.h"
 #include "vec/exprs/vin_predicate.h"
@@ -388,6 +387,11 @@ public:
         return {ExchangeType::BUCKET_HASH_SHUFFLE};
     }
 
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        local_state._scanner_ctx->clear_free_blocks();
+    }
+
     int64_t get_push_down_count() const { return _push_down_count; }
     using OperatorX<LocalStateType>::node_id;
     using OperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index e0729db9da6..81769663425 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -497,14 +497,11 @@ Status StreamingAggLocalState::_init_hash_method(const 
vectorized::VExprContextS
     return Status::OK();
 }
 
-void StreamingAggLocalState::set_low_memory_mode() {
-    auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
-    p._spill_streaming_agg_mem_limit = 1024 * 1024;
-}
-Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block,
+Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, 
vectorized::Block* input_block,
                                           vectorized::Block* output_block) {
-    if (state()->get_query_ctx()->low_memory_mode()) {
-        set_low_memory_mode();
+    if (state->get_query_ctx()->low_memory_mode()) {
+        auto& p = Base::_parent->template cast<StreamingAggOperatorX>();
+        p.set_low_memory_mode(state);
     }
     RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
 
@@ -1288,7 +1285,8 @@ Status StreamingAggOperatorX::push(RuntimeState* state, 
vectorized::Block* in_bl
 
     local_state._input_num_rows += in_block->rows();
     if (in_block->rows() > 0) {
-        RETURN_IF_ERROR(local_state.do_pre_agg(in_block, 
local_state._pre_aggregated_block.get()));
+        RETURN_IF_ERROR(
+                local_state.do_pre_agg(state, in_block, 
local_state._pre_aggregated_block.get()));
     }
     in_block->clear_column_data(_child->row_desc().num_materialized_slots());
     return Status::OK();
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 7d85d092d17..3ee52eeb6ec 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -45,9 +45,9 @@ public:
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
-    Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
+    Status do_pre_agg(RuntimeState* state, vectorized::Block* input_block,
+                      vectorized::Block* output_block);
     void make_nullable_output_key(vectorized::Block* block);
-    void set_low_memory_mode();
 
 private:
     friend class StreamingAggOperatorX;
@@ -209,6 +209,9 @@ public:
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
     bool need_more_input_data(RuntimeState* state) const override;
+    void set_low_memory_mode(RuntimeState* state) override {
+        _spill_streaming_agg_mem_limit = 1024 * 1024;
+    }
 
 private:
     friend class StreamingAggLocalState;
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 39ca43a9d1d..7a0766e0f92 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -92,7 +92,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) {
 Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     if (state->get_query_ctx()->low_memory_mode()) {
-        local_state._shared_state->data_queue.set_low_memory_mode();
+        set_low_memory_mode(state);
     }
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 3a8880622cb..9f9cdb58433 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -95,6 +95,11 @@ public:
         return _followed_by_shuffled_operator;
     }
 
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        local_state._shared_state->data_queue.set_low_memory_mode();
+    }
+
     bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
 
 private:
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 470914967ae..6412569bc4b 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -131,10 +131,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block* b
         }
         block->swap(*output_block);
         
output_block->clear_column_data(_row_descriptor.num_materialized_slots());
-        if (!state->get_query_ctx()->low_memory_mode()) {
-            
local_state._shared_state->data_queue.push_free_block(std::move(output_block),
-                                                                  child_idx);
-        }
+        
local_state._shared_state->data_queue.push_free_block(std::move(output_block), 
child_idx);
     }
     local_state.reached_limit(block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 0ee66c3da74..e3a119354c1 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -102,6 +102,13 @@ public:
         return _followed_by_shuffled_operator;
     }
 
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        if (local_state._shared_state) {
+            local_state._shared_state->data_queue.set_low_memory_mode();
+        }
+    }
+
     bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
 
 private:
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 084d33e456f..a1c535c487b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -146,10 +146,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
     if (state->get_query_ctx()->low_memory_mode()) {
-        local_state._shared_state->set_low_memory_mode(state);
-        local_state._exchanger->set_low_memory_mode();
+        set_low_memory_mode(state);
     }
-
     RETURN_IF_ERROR(local_state._exchanger->sink(
             state, in_block, eos,
             {local_state._compute_hash_value_timer, 
local_state._distribute_timer, nullptr},
@@ -162,5 +160,4 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
     return Status::OK();
 }
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c067f023c8d..160d6b363cf 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -108,6 +108,13 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
+    void set_low_memory_mode(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        SCOPED_TIMER(local_state.exec_time_counter());
+        local_state._shared_state->set_low_memory_mode(state);
+        local_state._exchanger->set_low_memory_mode();
+    }
+
 private:
     friend class LocalExchangeSinkLocalState;
     friend class ShuffleExchanger;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index b31dbd7a308..6e30297e2b2 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -453,15 +453,6 @@ void ExchangerBase::finalize() {
     }
 }
 
-void ExchangerBase::set_low_memory_mode() {
-    _free_block_limit = 0;
-
-    vectorized::Block block;
-    while (_free_blocks.try_dequeue(block)) {
-        // do nothing
-    }
-}
-
 void LocalMergeSortExchanger::finalize() {
     BlockWrapperSPtr next_block;
     vectorized::Block block;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 3a4bccf1f48..5684b418ff2 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -23,8 +23,11 @@
 namespace doris {
 #include "common/compile_check_begin.h"
 namespace vectorized {
+template <typename T>
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
+
 class PartitionerBase;
-}
+} // namespace vectorized
 namespace pipeline {
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
@@ -91,7 +94,10 @@ public:
 
     virtual std::string data_queue_debug_string(int i) = 0;
 
-    void set_low_memory_mode();
+    void set_low_memory_mode() {
+        _free_block_limit = 0;
+        clear_blocks(_free_blocks);
+    }
 
 protected:
     friend struct LocalExchangeSharedState;
@@ -104,7 +110,7 @@ protected:
     const int _num_partitions;
     const int _num_senders;
     const int _num_sources;
-    int _free_block_limit = 0;
+    std::atomic_int _free_block_limit = 0;
     moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
 };
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index ed40731bfd1..e09da6ab742 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -403,12 +403,17 @@ Status PipelineTask::execute(bool* eos) {
             *eos = _pending_eos;
         } else {
             SCOPED_TIMER(_get_block_timer);
+            if (_state->get_query_ctx()->low_memory_mode()) {
+                _sink->set_low_memory_mode(_state);
+                _root->set_low_memory_mode(_state);
+            }
             DEFER_RELEASE_RESERVED();
             _get_block_counter->update(1);
             const auto reserve_size = _root->get_reserve_mem_size(_state);
             _root->reset_reserve_mem_size(_state);
 
-            if (workload_group && _state->enable_reserve_memory() && 
reserve_size > 0) {
+            if (workload_group && 
_state->get_query_ctx()->enable_reserve_memory() &&
+                reserve_size > 0) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
 
                 COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -441,7 +446,7 @@ Status PipelineTask::execute(bool* eos) {
             DEFER_RELEASE_RESERVED();
             COUNTER_UPDATE(_memory_reserve_times, 1);
             auto workload_group = _state->get_query_ctx()->workload_group();
-            if (_state->enable_reserve_memory() && workload_group &&
+            if (_state->get_query_ctx()->enable_reserve_memory() && 
workload_group &&
                 !(wake_up_early() || _dry_run)) {
                 const auto sink_reserve_size = 
_sink->get_reserve_mem_size(_state, *eos);
                 status = sink_reserve_size != 0
diff --git a/be/src/runtime/memory/memory_profile.cpp 
b/be/src/runtime/memory/memory_profile.cpp
index 2bf6a175e51..ab874964c99 100644
--- a/be/src/runtime/memory/memory_profile.cpp
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -153,8 +153,7 @@ void MemoryProfile::refresh_memory_overview_profile() {
     ExecEnv::GetInstance()->rowsets_no_cache_mem_tracker()->set_consumption(
             MetadataAdder<RowsetMeta>::get_all_rowsets_size());
     ExecEnv::GetInstance()->segments_no_cache_mem_tracker()->set_consumption(
-            
MetadataAdder<segment_v2::Segment>::get_all_segments_estimate_size() -
-            SegmentLoader::instance()->cache_mem_usage());
+            MetadataAdder<segment_v2::Segment>::get_all_segments_size());
 
     // 4 refresh tracked memory counter
     std::unordered_map<MemTrackerLimiter::Type, int64_t> type_mem_sum = {
@@ -334,21 +333,16 @@ int64_t MemoryProfile::other_current_usage() {
 }
 
 std::string MemoryProfile::process_memory_detail_str() const {
-    return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}",
+    return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}\n, {}\n, 
{}\n",
                        GlobalMemoryArbitrator::process_mem_log_str(),
                        print_memory_overview_profile(), 
print_global_memory_profile(),
+                       print_metadata_memory_profile(), 
print_cache_memory_profile(),
                        print_top_memory_tasks_profile());
 }
 
 void MemoryProfile::print_log_process_usage() {
     if (_enable_print_log_process_usage) {
         _enable_print_log_process_usage = false;
-        LOG(WARNING) << "Process Memory Summary: " + 
GlobalMemoryArbitrator::process_mem_log_str();
-        LOG(WARNING) << "\n" << print_memory_overview_profile();
-        LOG(WARNING) << "\n" << print_global_memory_profile();
-        LOG(WARNING) << "\n" << print_metadata_memory_profile();
-        LOG(WARNING) << "\n" << print_cache_memory_profile();
-        LOG(WARNING) << "\n" << print_top_memory_tasks_profile();
         LOG(WARNING) << process_memory_detail_str();
     }
 }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a50d6041d35..954369e29fa 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -309,6 +309,13 @@ public:
 
     bool low_memory_mode() { return _low_memory_mode; }
 
+    void disable_reserve_memory() { _enable_reserve_memory = false; }
+
+    bool enable_reserve_memory() const {
+        return _query_options.__isset.enable_reserve_memory &&
+               _query_options.enable_reserve_memory && _enable_reserve_memory;
+    }
+
     void update_paused_reason(const Status& st) {
         std::lock_guard l(_paused_mutex);
         if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -391,6 +398,7 @@ private:
     MonotonicStopWatch _paused_timer;
     std::atomic<int64_t> _paused_period_secs = 0;
     std::atomic<bool> _low_memory_mode = false;
+    std::atomic<bool> _enable_reserve_memory = true;
     int64_t _user_set_mem_limit = 0;
     std::atomic<int64_t> _adjusted_mem_limit = 0;
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 2cbc45dbd94..c4def4c0513 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -540,5 +540,4 @@ std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::build_pipeline_profil
     }
     return _pipeline_id_to_profile;
 }
-
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 084f6522229..6c2d0918d8b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -555,10 +555,6 @@ public:
         return _query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill;
     }
 
-    bool enable_reserve_memory() const {
-        return _query_options.__isset.enable_reserve_memory && 
_query_options.enable_reserve_memory;
-    }
-
     int64_t spill_min_revocable_mem() const {
         if (_query_options.__isset.min_revocable_mem) {
             return std::max(_query_options.min_revocable_mem, (int64_t)1);
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 4d458cd440f..eea6bdb5c5a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -563,7 +563,7 @@ std::ostream& operator<<(std::ostream& os, const 
TabletsChannelKey& key) {
 
 Status BaseTabletsChannel::_write_block_data(
         const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
-        std::unordered_map<int64_t, std::vector<uint32_t>>& tablet_to_rowidxs,
+        std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
         PTabletWriterAddBlockResult* response) {
     vectorized::Block send_data;
     RETURN_IF_ERROR(send_data.deserialize(request.block()));
@@ -639,7 +639,7 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         return Status::OK();
     }
 
-    std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row 
index */>
+    std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row 
index */>
             tablet_to_rowidxs;
     _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
 
@@ -657,7 +657,7 @@ bool BaseTabletsChannel::_is_broken_tablet(int64_t 
tablet_id) const {
 
 void BaseTabletsChannel::_build_tablet_to_rowidxs(
         const PTabletWriterAddBlockRequest& request,
-        std::unordered_map<int64_t, std::vector<uint32_t>>* tablet_to_rowidxs) 
{
+        std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) 
{
     // just add a coarse-grained read lock here rather than each time when 
visiting _broken_tablets
     // tests show that a relatively coarse-grained read lock here performs 
better under multicore scenario
     // see: https://github.com/apache/doris/pull/28552
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 87fbf9d06aa..55fe96df750 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -34,6 +34,7 @@
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 #include "util/uid_util.h"
+#include "vec/common/custom_allocator.h"
 
 namespace google::protobuf {
 template <typename Element>
@@ -121,7 +122,7 @@ public:
 
 protected:
     Status _write_block_data(const PTabletWriterAddBlockRequest& request, 
int64_t cur_seq,
-                             std::unordered_map<int64_t, 
std::vector<uint32_t>>& tablet_to_rowidxs,
+                             std::unordered_map<int64_t, 
DorisVector<uint32_t>>& tablet_to_rowidxs,
                              PTabletWriterAddBlockResult* response);
 
     Status _get_current_seq(int64_t& cur_seq, const 
PTabletWriterAddBlockRequest& request);
@@ -136,7 +137,7 @@ protected:
                            int64_t tablet_id, Status error) const;
     void _build_tablet_to_rowidxs(
             const PTabletWriterAddBlockRequest& request,
-            std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> 
/* row index */>*
+            std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> 
/* row index */>*
                     tablet_to_rowidxs);
     virtual void _init_profile(RuntimeProfile* profile);
 
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 7eb31b28400..962db6df333 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -688,10 +688,12 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                 query_ctx->set_memory_sufficient(true);
                 return true;
             } else if (time_in_queue >= 
config::spill_in_paused_queue_timeout_ms) {
-                // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
+                // if cannot find any memory to release, then let the query 
continue to run as far as possible
+                // or cancelled by gc if memory is really not enough.
                 auto msg1 = fmt::format(
-                        "Query {} failed beause query limit is exceeded, but 
could "
-                        "not find memory that could release or spill to disk. 
Query memory usage: "
+                        "Query {} memory limit is exceeded, but could "
+                        "not find memory that could release or spill to disk, 
disable reserve "
+                        "memory and resume it. Query memory usage: "
                         "{}, limit: {}, reserved "
                         "size: {}, try to reserve: {}, wg info: {}.",
                         query_id, PrettyPrinter::print_bytes(memory_usage),
@@ -702,7 +704,9 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                                          doris::ProcessProfile::instance()
                                                  ->memory_profile()
                                                  
->process_memory_detail_str());
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
+                query_ctx->disable_reserve_memory();
+                query_ctx->set_memory_sufficient(true);
+                return true;
             } else {
                 return false;
             }
@@ -713,9 +717,12 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                 query_ctx->set_memory_sufficient(true);
                 return true;
             } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
+                // if cannot find any memory to release, then let the query 
continue to run as far as possible
+                // or cancelled by gc if memory is really not enough.
                 auto msg1 = fmt::format(
-                        "Query {} failed because workload group memory is 
exceeded"
-                        ", and there is no cache now. And could not find task 
to spill. "
+                        "Query {} workload group memory is exceeded"
+                        ", and there is no cache now. And could not find task 
to spill, disable "
+                        "reserve memory and resume it. "
                         "Query memory usage: {}, limit: {}, reserved "
                         "size: {}, try to reserve: {}, wg info: {}."
                         " Maybe you should set the workload group's limit to a 
lower value.",
@@ -727,7 +734,9 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                                          doris::ProcessProfile::instance()
                                                  ->memory_profile()
                                                  
->process_memory_detail_str());
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
+                query_ctx->disable_reserve_memory();
+                query_ctx->set_memory_sufficient(true);
+                return true;
             } else {
                 return false;
             }
@@ -745,9 +754,12 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                 query_ctx->set_memory_sufficient(true);
                 return true;
             } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
+                // if cannot find any memory to release, then let the query 
continue to run as far as possible
+                // or cancelled by gc if memory is really not enough.
                 auto msg1 = fmt::format(
-                        "Query {} failed because process memory is exceeded"
-                        ", and there is no cache now. And could not find task 
to spill. "
+                        "Query {} process memory is exceeded"
+                        ", and there is no cache now. And could not find task 
to spill, disable "
+                        "reserve memory and resume it. "
                         "Query memory usage: {}, limit: {}, reserved "
                         "size: {}, try to reserve: {}, wg info: {}."
                         " Maybe you should set the workload group's limit to a 
lower value.",
@@ -759,7 +771,8 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                                          doris::ProcessProfile::instance()
                                                  ->memory_profile()
                                                  
->process_memory_detail_str());
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
+                query_ctx->disable_reserve_memory();
+                query_ctx->set_memory_sufficient(true);
             } else {
                 return false;
             }
@@ -831,6 +844,9 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         if (!query_ctx) {
             continue;
         }
+        if (is_low_watermark) {
+            query_ctx->set_low_memory_mode();
+        }
         int64_t query_weighted_mem_limit = 0;
         int64_t expected_query_weighted_mem_limit = 0;
         // If the query enable hard limit, then it should not use the soft 
limit
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 29442c8ecac..8584e5df3cd 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -60,6 +60,16 @@ enum CompressionTypePB : int;
 } // namespace doris::segment_v2
 
 namespace doris::vectorized {
+template <typename T>
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks) {
+    T block;
+    while (blocks.try_dequeue(block)) {
+        // do nothing
+    }
+}
+
+template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&);
+template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&);
 
 Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
     initialize_index_by_name();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 5f0aeaae67b..c617ff27b1f 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -262,11 +262,7 @@ void 
ScannerContext::return_free_block(vectorized::BlockUPtr block) {
 }
 
 void ScannerContext::clear_free_blocks() {
-    vectorized::BlockUPtr block;
-    while (_free_blocks.try_dequeue(block)) {
-        // do nothing
-    }
-    block.reset();
+    clear_blocks(_free_blocks);
 }
 
 Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 8a2387cc69e..530d9371059 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -303,7 +303,7 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 if (first_read) {
                     free_block = ctx->get_free_block(first_read);
                 } else {
-                    if (state->enable_reserve_memory()) {
+                    if (state->get_query_ctx()->enable_reserve_memory()) {
                         size_t block_avg_bytes = 
scanner->get_block_avg_bytes();
                         auto st = 
thread_context()->try_reserve_memory(block_avg_bytes);
                         if (!st.ok()) {
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 6248a28dba5..83e1c491c06 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -48,9 +48,9 @@ class VNodeChannel;
 // <row_idx, partition_id, tablet_id>
 class RowPartTabletIds {
 public:
-    std::vector<uint32_t> row_ids;
-    std::vector<int64_t> partition_ids;
-    std::vector<int64_t> tablet_ids;
+    DorisVector<uint32_t> row_ids;
+    DorisVector<int64_t> partition_ids;
+    DorisVector<int64_t> tablet_ids;
 
     std::string debug_string() const {
         std::string value;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index ed4f71677f2..28756cfe78f 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -235,5 +235,10 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
     return b;
 }
 
+template <typename T>
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
+void AsyncResultWriter::clear_free_blocks() {
+    clear_blocks(_free_blocks);
+}
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 2a90dd2dbd0..a1265655c58 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -72,6 +72,8 @@ public:
 
     Status get_writer_status() { return _writer_status.status(); }
 
+    void clear_free_blocks();
+
 protected:
     Status _projection_block(Block& input_block, Block* output_block);
     const VExprContextSPtrs& _vec_output_expr_ctxs;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 46a3974bba8..b6b6b623698 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -89,7 +89,7 @@ class DeltaWriterV2Map;
 struct Rows {
     int64_t partition_id;
     int64_t index_id;
-    std::vector<uint32_t> row_idxes;
+    DorisVector<uint32_t> row_idxes;
 };
 
 using RowsForTablet = std::unordered_map<int64_t, Rows>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to