This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new f08a3bb4dc6 Use DorisVector to avoid memory usage from not being 
traced (#41557)
f08a3bb4dc6 is described below

commit f08a3bb4dc6d50c64426b34256d8c49092552bbd
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Oct 9 17:54:50 2024 +0800

    Use DorisVector to avoid memory usage from not being traced (#41557)
    
    ## Proposed changes
    
    And merge hash table of agg in thread of pipeline tasks.
    
    <!--Describe your changes.-->
---
 be/src/pipeline/common/agg_utils.h                 |  22 +++-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  63 ++++++-----
 be/src/pipeline/exec/aggregation_sink_operator.h   |   4 +-
 .../pipeline/exec/aggregation_source_operator.cpp  | 121 +++++++++++++--------
 be/src/pipeline/exec/aggregation_source_operator.h |   6 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  10 +-
 .../pipeline/exec/join/process_hash_table_probe.h  |   3 +-
 .../exec/join/process_hash_table_probe_impl.h      |   3 +-
 be/src/pipeline/exec/operator.h                    |   2 -
 .../exec/partitioned_aggregation_sink_operator.cpp |  18 +--
 .../exec/partitioned_aggregation_sink_operator.h   |   5 +-
 .../partitioned_aggregation_source_operator.cpp    | 114 ++++++++++---------
 .../exec/partitioned_aggregation_source_operator.h |   8 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 110 ++++++++++++-------
 .../exec/partitioned_hash_join_probe_operator.h    |  10 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  21 +++-
 .../exec/partitioned_hash_join_sink_operator.h     |   3 +
 be/src/pipeline/pipeline_task.cpp                  |   1 -
 .../workload_group/workload_group_manager.cpp      |  13 +++
 be/src/vec/columns/column.h                        |  29 ++++-
 be/src/vec/columns/column_const.h                  |   5 +-
 be/src/vec/columns/column_decimal.cpp              |   9 +-
 be/src/vec/columns/column_decimal.h                |   9 +-
 be/src/vec/columns/column_nullable.cpp             |   4 +-
 be/src/vec/columns/column_nullable.h               |   6 +-
 be/src/vec/columns/column_object.h                 |   9 +-
 be/src/vec/columns/column_string.cpp               |  11 +-
 be/src/vec/columns/column_string.h                 |   9 +-
 be/src/vec/columns/column_vector.cpp               |   9 +-
 be/src/vec/columns/column_vector.h                 |   9 +-
 be/src/vec/common/custom_allocator.h               |   2 +-
 be/src/vec/common/hash_table/hash_map_context.h    |  32 +++---
 be/src/vec/common/hash_table/join_hash_table.h     |  11 +-
 33 files changed, 419 insertions(+), 272 deletions(-)

diff --git a/be/src/pipeline/common/agg_utils.h 
b/be/src/pipeline/common/agg_utils.h
index d67ebb9fdf7..6b18e42b3b3 100644
--- a/be/src/pipeline/common/agg_utils.h
+++ b/be/src/pipeline/common/agg_utils.h
@@ -210,7 +210,7 @@ public:
         }
 
         *reinterpret_cast<KeyType*>(_current_keys) = key;
-        auto aggregate_data = _current_agg_data;
+        auto* aggregate_data = _current_agg_data;
         ++_total_count;
         ++_index_in_sub_container;
         _current_agg_data += _size_of_aggregate_states;
@@ -287,6 +287,26 @@ public:
 
     [[nodiscard]] uint32_t total_count() const { return _total_count; }
 
+    size_t estimate_memory(size_t rows) const {
+        bool need_to_expand = false;
+        if (_total_count == 0) {
+            need_to_expand = true;
+        } else if ((_index_in_sub_container + rows) > SUB_CONTAINER_CAPACITY) {
+            need_to_expand = true;
+            rows -= (SUB_CONTAINER_CAPACITY - _index_in_sub_container);
+        }
+
+        if (!need_to_expand) {
+            return 0;
+        }
+
+        size_t count = (rows + SUB_CONTAINER_CAPACITY - 1) / 
SUB_CONTAINER_CAPACITY;
+        size_t size = _size_of_key * SUB_CONTAINER_CAPACITY;
+        size += _size_of_aggregate_states * SUB_CONTAINER_CAPACITY;
+        size *= count;
+        return size;
+    }
+
     void init_once() {
         if (_inited) {
             return;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 0107c5ec4fb..d9fc1ee9417 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -77,8 +77,8 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
 
-    _container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", 
TUnit::BYTES);
-    _arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", 
TUnit::BYTES);
+    _memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", 
TUnit::BYTES);
+    _memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", 
TUnit::BYTES);
 
     return Status::OK();
 }
@@ -230,36 +230,35 @@ size_t AggSinkLocalState::_memory_usage() const {
 }
 
 void AggSinkLocalState::_update_memusage_with_serialized_key() {
-    std::visit(
-            vectorized::Overload {
-                    [&](std::monostate& arg) -> void {
-                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                    },
-                    [&](auto& agg_method) -> void {
-                        auto& data = *agg_method.hash_table;
-                        auto arena_memory_usage =
-                                _agg_arena_pool->size() +
-                                
Base::_shared_state->aggregate_data_container->memory_usage() -
-                                
Base::_shared_state->mem_usage_record.used_in_arena;
-                        Base::_mem_tracker->consume(arena_memory_usage);
-                        Base::_mem_tracker->consume(
-                                data.get_buffer_size_in_bytes() -
-                                
Base::_shared_state->mem_usage_record.used_in_state);
-                        
_serialize_key_arena_memory_usage->add(arena_memory_usage);
-                        COUNTER_SET(_container_memory_usage,
-                                    
Base::_shared_state->aggregate_data_container->memory_usage());
-                        COUNTER_SET(_arena_memory_usage,
-                                    
static_cast<int64_t>(_agg_arena_pool->size()));
-                        COUNTER_UPDATE(_hash_table_memory_usage,
-                                       data.get_buffer_size_in_bytes() -
-                                               
Base::_shared_state->mem_usage_record.used_in_state);
-                        Base::_shared_state->mem_usage_record.used_in_state =
-                                data.get_buffer_size_in_bytes();
-                        Base::_shared_state->mem_usage_record.used_in_arena =
-                                _agg_arena_pool->size() +
-                                
Base::_shared_state->aggregate_data_container->memory_usage();
-                    }},
-            _agg_data->method_variant);
+    std::visit(vectorized::Overload {
+                       [&](std::monostate& arg) -> void {
+                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                       },
+                       [&](auto& agg_method) -> void {
+                           auto& data = *agg_method.hash_table;
+                           auto arena_memory_usage =
+                                   _agg_arena_pool->size() +
+                                   
Base::_shared_state->aggregate_data_container->memory_usage() -
+                                   
Base::_shared_state->mem_usage_record.used_in_arena;
+                           Base::_mem_tracker->consume(arena_memory_usage);
+                           Base::_mem_tracker->consume(
+                                   data.get_buffer_size_in_bytes() -
+                                   
Base::_shared_state->mem_usage_record.used_in_state);
+                           
_serialize_key_arena_memory_usage->add(arena_memory_usage);
+                           COUNTER_SET(
+                                   _memory_usage_container,
+                                   
Base::_shared_state->aggregate_data_container->memory_usage());
+                           COUNTER_SET(_memory_usage_arena,
+                                       
static_cast<int64_t>(_agg_arena_pool->size()));
+                           COUNTER_SET(_hash_table_memory_usage,
+                                       
int64_t(data.get_buffer_size_in_bytes()));
+                           Base::_shared_state->mem_usage_record.used_in_state 
=
+                                   data.get_buffer_size_in_bytes();
+                           Base::_shared_state->mem_usage_record.used_in_arena 
=
+                                   _agg_arena_pool->size() +
+                                   
Base::_shared_state->aggregate_data_container->memory_usage();
+                       }},
+               _agg_data->method_variant);
 }
 
 Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr 
data) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2fa2d18a7e6..66aa6bd88b5 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -112,8 +112,8 @@ protected:
     RuntimeProfile::Counter* _max_row_size_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
-    RuntimeProfile::Counter* _container_memory_usage = nullptr;
-    RuntimeProfile::Counter* _arena_memory_usage = nullptr;
+    RuntimeProfile::Counter* _memory_usage_container = nullptr;
+    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
     bool _should_limit_output = false;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 2d760092f57..0cf68924a1d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -53,6 +53,9 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _hash_table_size_counter =
             ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", 
TUnit::UNIT, 1);
 
+    _memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", 
TUnit::BYTES);
+    _memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", 
TUnit::BYTES);
+
     auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._without_key) {
         if (p._needs_finalize) {
@@ -582,6 +585,21 @@ template Status 
AggSourceOperatorX::merge_with_serialized_key_helper<true>(
 template Status AggSourceOperatorX::merge_with_serialized_key_helper<false>(
         RuntimeState* state, vectorized::Block* block);
 
+size_t AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* 
state,
+                                                                 size_t rows) 
const {
+    auto& local_state = get_local_state(state);
+    size_t size = std::visit(
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> size_t {
+                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                        return 0;
+                    },
+                    [&](auto& agg_method) { return 
agg_method.hash_table->estimate_memory(rows); }},
+            local_state._shared_state->agg_data->method_variant);
+    size += 
local_state._shared_state->aggregate_data_container->estimate_memory(rows);
+    return size;
+}
+
 size_t AggLocalState::_get_hash_table_size() {
     return std::visit(
             vectorized::Overload {[&](std::monostate& arg) -> size_t {
@@ -596,54 +614,61 @@ size_t AggLocalState::_get_hash_table_size() {
 void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* 
places,
                                              vectorized::ColumnRawPtrs& 
key_columns,
                                              size_t num_rows) {
-    std::visit(vectorized::Overload {
-                       [&](std::monostate& arg) -> void {
-                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                       },
-                       [&](auto& agg_method) -> void {
-                           SCOPED_TIMER(_hash_table_compute_timer);
-                           using HashMethodType = 
std::decay_t<decltype(agg_method)>;
-                           using AggState = typename HashMethodType::State;
-                           AggState state(key_columns);
-                           agg_method.init_serialized_keys(key_columns, 
num_rows);
-
-                           auto creator = [this](const auto& ctor, auto& key, 
auto& origin) {
-                               HashMethodType::try_presis_key_and_origin(
-                                       key, origin, 
*_shared_state->agg_arena_pool);
-                               auto mapped =
-                                       
Base::_shared_state->aggregate_data_container->append_data(
-                                               origin);
-                               auto st = _create_agg_status(mapped);
-                               if (!st) {
-                                   throw Exception(st.code(), st.to_string());
-                               }
-                               ctor(key, mapped);
-                           };
-
-                           auto creator_for_null_key = [&](auto& mapped) {
-                               mapped = 
_shared_state->agg_arena_pool->aligned_alloc(
-                                       
_shared_state->total_size_of_aggregate_states,
-                                       _shared_state->align_aggregate_states);
-                               auto st = _create_agg_status(mapped);
-                               if (!st) {
-                                   throw Exception(st.code(), st.to_string());
-                               }
-                           };
-
-                           SCOPED_TIMER(_hash_table_emplace_timer);
-                           for (size_t i = 0; i < num_rows; ++i) {
-                               places[i] = agg_method.lazy_emplace(state, i, 
creator,
-                                                                   
creator_for_null_key);
-                           }
-
-                           COUNTER_UPDATE(_hash_table_input_counter, num_rows);
-                           COUNTER_SET(_hash_table_memory_usage,
-                                       static_cast<int64_t>(
-                                               
agg_method.hash_table->get_buffer_size_in_bytes()));
-                           COUNTER_SET(_hash_table_size_counter,
-                                       
static_cast<int64_t>(agg_method.hash_table->size()));
-                       }},
-               _shared_state->agg_data->method_variant);
+    std::visit(
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> void {
+                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                    },
+                    [&](auto& agg_method) -> void {
+                        SCOPED_TIMER(_hash_table_compute_timer);
+                        using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                        using AggState = typename HashMethodType::State;
+                        AggState state(key_columns);
+                        agg_method.init_serialized_keys(key_columns, num_rows);
+
+                        auto creator = [this](const auto& ctor, auto& key, 
auto& origin) {
+                            HashMethodType::try_presis_key_and_origin(
+                                    key, origin, 
*_shared_state->agg_arena_pool);
+                            auto mapped =
+                                    
Base::_shared_state->aggregate_data_container->append_data(
+                                            origin);
+                            auto st = _create_agg_status(mapped);
+                            if (!st) {
+                                throw Exception(st.code(), st.to_string());
+                            }
+                            ctor(key, mapped);
+                        };
+
+                        auto creator_for_null_key = [&](auto& mapped) {
+                            mapped = 
_shared_state->agg_arena_pool->aligned_alloc(
+                                    
_shared_state->total_size_of_aggregate_states,
+                                    _shared_state->align_aggregate_states);
+                            auto st = _create_agg_status(mapped);
+                            if (!st) {
+                                throw Exception(st.code(), st.to_string());
+                            }
+                        };
+
+                        SCOPED_TIMER(_hash_table_emplace_timer);
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            places[i] = agg_method.lazy_emplace(state, i, 
creator,
+                                                                
creator_for_null_key);
+                        }
+
+                        COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+                        COUNTER_SET(_hash_table_memory_usage,
+                                    static_cast<int64_t>(
+                                            
agg_method.hash_table->get_buffer_size_in_bytes()));
+                        COUNTER_SET(_hash_table_size_counter,
+                                    
static_cast<int64_t>(agg_method.hash_table->size()));
+                        COUNTER_SET(
+                                _memory_usage_container,
+                                static_cast<int64_t>(
+                                        
_shared_state->aggregate_data_container->memory_usage()));
+                        COUNTER_SET(_memory_usage_arena,
+                                    
static_cast<int64_t>(_shared_state->agg_arena_pool->size()));
+                    }},
+            _shared_state->agg_data->method_variant);
 }
 
 void AggLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places,
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 98ddd6a2142..09b03a5aab1 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -86,6 +86,8 @@ protected:
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _merge_timer = nullptr;
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
+    RuntimeProfile::Counter* _memory_usage_container = nullptr;
+    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
 
     using vectorized_get_result =
             std::function<Status(RuntimeState* state, vectorized::Block* 
block, bool* eos)>;
@@ -102,7 +104,7 @@ public:
     using Base = OperatorX<AggLocalState>;
     AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                        const DescriptorTbl& descs);
-    ~AggSourceOperatorX() = default;
+    ~AggSourceOperatorX() override = default;
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
@@ -111,6 +113,8 @@ public:
     template <bool limit>
     Status merge_with_serialized_key_helper(RuntimeState* state, 
vectorized::Block* block);
 
+    size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t 
rows) const;
+
 private:
     friend class AggLocalState;
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 02e538c4ab3..a29c87bdc4c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -122,7 +122,11 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
     size_t size_to_reserve = 0;
 
     if (!_build_side_mutable_block.empty()) {
-        size_to_reserve += _build_side_mutable_block.allocated_bytes();
+        const auto bytes = _build_side_mutable_block.bytes();
+        const auto allocated_bytes = 
_build_side_mutable_block.allocated_bytes();
+        if (allocated_bytes != 0 && ((bytes * 100) / allocated_bytes) >= 85) {
+            size_to_reserve += bytes;
+        }
     }
 
     const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
@@ -277,6 +281,10 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     if (UNLIKELY(rows == 0)) {
         return Status::OK();
     }
+
+    LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " 
<< block.columns()
+              << ", bytes/allocated_bytes: " << block.bytes() << "/" << 
block.allocated_bytes();
+
     COUNTER_UPDATE(_build_rows_counter, rows);
     block.replace_if_overflow();
 
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 965d62192b2..620438677b8 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -23,6 +23,7 @@
 #include "vec/columns/column.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/arena.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 namespace vectorized {
@@ -73,7 +74,7 @@ struct ProcessHashTableProbe {
     // each matching join column need to be processed by other join conjunct. 
so the struct of mutable block
     // and output block may be different
     // The output result is determined by the other join conjunct result and 
same_to_prev struct
-    Status do_other_join_conjuncts(vectorized::Block* output_block, 
std::vector<uint8_t>& visited,
+    Status do_other_join_conjuncts(vectorized::Block* output_block, 
DorisVector<uint8_t>& visited,
                                    bool has_null_in_build_side);
 
     template <bool with_other_conjuncts>
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 653cc8ab447..f1fea52ca89 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -26,6 +26,7 @@
 #include "util/simd/bits.h"
 #include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::pipeline {
@@ -493,7 +494,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
 
 template <int JoinOpType>
 Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* 
output_block,
-                                                                  
std::vector<uint8_t>& visited,
+                                                                  
DorisVector<uint8_t>& visited,
                                                                   bool 
has_null_in_build_side) {
     // dispose the other join conjunct exec
     auto row_count = output_block->rows();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b7fbd1b8cea..b8e3c5b3079 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -196,8 +196,6 @@ public:
     Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
                         vectorized::Block* block, size_t column_to_keep);
 
-    void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage 
+= usage; }
-
     int64_t& estimate_memory_usage() { return _estimate_memory_usage; }
 
     void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 0bed1d9e13d..60f691de47f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -107,10 +107,12 @@ void PartitionedAggSinkLocalState::_init_counters() {
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
-    _container_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "ContainerMemoryUsage", 
TUnit::BYTES, 1);
-    _arena_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "ArenaMemoryUsage", 
TUnit::BYTES, 1);
+    _memory_usage_container =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageContainer", 
TUnit::BYTES, 1);
+    _memory_usage_arena =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageArena", 
TUnit::BYTES, 1);
+    _memory_usage_reserved =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
     COUNTER_SET(_max_row_size_counter, (int64_t)0);
 
     _spill_serialize_hash_table_timer =
@@ -137,8 +139,8 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
     UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
     UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
     UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
-    UPDATE_PROFILE(_container_memory_usage, "ContainerMemoryUsage");
-    UPDATE_PROFILE(_arena_memory_usage, "ArenaMemoryUsage");
+    UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer");
+    UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena");
 
     update_max_min_rows_counter();
 }
@@ -255,7 +257,9 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
 size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
     auto& local_state = get_local_state(state);
     auto* runtime_state = local_state._runtime_state.get();
-    return _agg_sink_operator->get_reserve_mem_size(runtime_state);
+    auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state);
+    COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
+    return size;
 }
 
 Status PartitionedAggSinkLocalState::revoke_memory(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 22001b752a2..3c6b46f908e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -303,8 +303,9 @@ public:
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
     RuntimeProfile::Counter* _max_row_size_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
-    RuntimeProfile::Counter* _container_memory_usage = nullptr;
-    RuntimeProfile::Counter* _arena_memory_usage = nullptr;
+    RuntimeProfile::Counter* _memory_usage_container = nullptr;
+    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
+    RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
     RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 8b281a88684..c6a6c09f01b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -27,6 +27,7 @@
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -74,6 +75,10 @@ void PartitionedAggLocalState::_init_counters() {
             ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", 
TUnit::UNIT, 1);
     _hash_table_memory_usage =
             ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
+
+    _memory_usage_container =
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageContainer", 
TUnit::BYTES, 1);
+    _memory_usage_arena = ADD_COUNTER_WITH_LEVEL(profile(), 
"MemoryUsageArena", TUnit::BYTES, 1);
 }
 
 #define UPDATE_PROFILE(counter, name)                           \
@@ -92,6 +97,8 @@ void PartitionedAggLocalState::update_profile(RuntimeProfile* 
child_profile) {
     UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime");
     UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize");
     UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage");
+    UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer");
+    UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena");
 }
 
 Status PartitionedAggLocalState::close(RuntimeState* state) {
@@ -140,18 +147,37 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
-    if (local_state._shared_state->is_spilled) {
-        local_state._status = 
local_state.initiate_merge_spill_partition_agg_data(state);
-        RETURN_IF_ERROR(local_state._status);
-
-        /// When `_is_merging` is true means we are reading spilled data and 
merging the data into hash table.
-        if (local_state._is_merging) {
+    if (local_state._shared_state->is_spilled &&
+        local_state._need_to_merge_data_for_current_partition) {
+        if (local_state._blocks.empty() && 
!local_state._current_partition_eos) {
+            bool has_recovering_data = false;
+            RETURN_IF_ERROR(local_state.recover_blocks_from_disk(state, 
has_recovering_data));
+            *eos = !has_recovering_data;
             return Status::OK();
+        } else if (!local_state._blocks.empty()) {
+            size_t merged_rows = 0;
+            while (!local_state._blocks.empty()) {
+                auto block = std::move(local_state._blocks.front());
+                merged_rows += block.rows();
+                local_state._blocks.erase(local_state._blocks.begin());
+                
RETURN_IF_ERROR(_agg_source_operator->merge_with_serialized_key_helper<false>(
+                        local_state._runtime_state.get(), &block));
+            }
+            local_state._estimate_memory_usage +=
+                    
_agg_source_operator->get_estimated_memory_size_for_merging(
+                            local_state._runtime_state.get(), merged_rows);
+
+            if (!local_state._current_partition_eos) {
+                return Status::OK();
+            }
         }
+
+        local_state._need_to_merge_data_for_current_partition = false;
     }
 
     // not spilled in sink or current partition still has data
     auto* runtime_state = local_state._runtime_state.get();
+    
local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once();
     local_state._status = _agg_source_operator->get_block(runtime_state, 
block, eos);
     RETURN_IF_ERROR(local_state._status);
     if (local_state._runtime_state) {
@@ -162,6 +188,9 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     if (*eos) {
         if (local_state._shared_state->is_spilled &&
             !local_state._shared_state->spill_partitions.empty()) {
+            local_state._current_partition_eos = false;
+            local_state._need_to_merge_data_for_current_partition = true;
+            
RETURN_IF_ERROR(local_state._shared_state->in_mem_shared_state->reset_hash_table());
             *eos = false;
         }
     }
@@ -195,53 +224,34 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
     return source_local_state->open(state);
 }
 
-Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(RuntimeState* 
state) {
-    DCHECK(!_is_merging);
-    
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-    if 
(Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator !=
-                
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() ||
-        _shared_state->spill_partitions.empty()) {
+Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, 
bool& has_data) {
+    const auto query_id = state->query_id();
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    if (_shared_state->spill_partitions.empty()) {
+        _shared_state->close();
+        has_data = false;
         return Status::OK();
     }
 
-    _is_merging = true;
-    VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << 
_parent->node_id()
-               << ", task id: " << _state->task_id() << " merge spilled agg 
data";
-
-    
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
-    _spill_dependency->Dependency::block();
-
-    auto query_id = state->query_id();
-
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
+    has_data = true;
     auto spill_func = [this, state, query_id, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         MonotonicStopWatch execution_timer;
         execution_timer.start();
-        size_t read_size = 0;
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
                     LOG(WARNING) << "query " << print_id(query_id) << " agg 
node "
-                                 << _parent->node_id()
-                                 << " merge spilled agg data error: " << 
_status;
+                                 << _parent->node_id() << " recover agg data 
error: " << _status;
                 }
                 _shared_state->close();
-            } else {
-                VLOG_DEBUG << "query " << print_id(query_id) << " agg node " 
<< _parent->node_id()
-                           << ", task id: " << _state->task_id()
-                           << " merge spilled agg data finish, time used: "
-                           << (execution_timer.elapsed_time() / (1000L * 1000 
* 1000))
-                           << "s, read size: " << read_size << ", "
-                           << _shared_state->spill_partitions.size() << " 
partitions left";
             }
-            
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-            _is_merging = false;
             _spill_dependency->Dependency::set_ready();
         }};
         bool has_agg_data = false;
-        auto& parent = Base::_parent->template cast<Parent>();
+        size_t accumulated_blocks_size = 0;
         while (!state->is_cancelled() && !has_agg_data &&
                !_shared_state->spill_partitions.empty()) {
             for (auto& stream : 
_shared_state->spill_partitions[0]->spill_streams_) {
@@ -267,28 +277,23 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
                     if (!block.empty()) {
                         has_agg_data = true;
-                        read_size += block.bytes();
-                        _status = parent._agg_source_operator
-                                          
->merge_with_serialized_key_helper<false>(
-                                                  _runtime_state.get(), 
&block);
-                        RETURN_IF_ERROR(_status);
+                        accumulated_blocks_size += block.allocated_bytes();
+                        _blocks.emplace_back(std::move(block));
+
+                        if (accumulated_blocks_size >=
+                            
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+                            break;
+                        }
                     }
                 }
-                
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-
-                if (!has_agg_data) {
-                    VLOG_DEBUG << "query " << print_id(query_id) << " agg node 
"
-                               << _parent->node_id() << ", task id: " << 
_state->task_id()
-                               << " merge spilled agg data finish, time used: "
-                               << execution_timer.elapsed_time() << ", empty 
partition "
-                               << read_size << ", " << 
_shared_state->spill_partitions.size()
-                               << " partitions left";
+
+                _current_partition_eos = eos;
+
+                if (_current_partition_eos) {
+                    
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+                    _shared_state->spill_partitions.pop_front();
                 }
             }
-            _shared_state->spill_partitions.pop_front();
-        }
-        if (_shared_state->spill_partitions.empty()) {
-            _shared_state->close();
         }
         return _status;
     };
@@ -313,6 +318,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_agg_source submit_func failed");
     });
+    _spill_dependency->block();
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
             std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
                                             exception_catch_func));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 05f9ff6eff0..53e683ada20 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -41,7 +41,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status initiate_merge_spill_partition_agg_data(RuntimeState* state);
+    Status recover_blocks_from_disk(RuntimeState* state, bool& has_data);
     Status setup_in_memory_agg_op(RuntimeState* state);
 
     void update_profile(RuntimeProfile* child_profile);
@@ -57,9 +57,10 @@ protected:
     std::unique_ptr<std::promise<Status>> _spill_merge_promise;
     std::future<Status> _spill_merge_future;
     bool _current_partition_eos = true;
-    bool _is_merging = false;
+    bool _need_to_merge_data_for_current_partition = true;
 
     std::shared_ptr<Dependency> _spill_dependency;
+    std::vector<vectorized::Block> _blocks;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _get_results_timer = nullptr;
@@ -75,6 +76,9 @@ protected:
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+    RuntimeProfile::Counter* _memory_usage_container = nullptr;
+    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
+    RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
 };
 class AggSourceOperatorX;
 class PartitionedAggSourceOperatorX : public 
OperatorX<PartitionedAggLocalState> {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 81dccb5eca1..70312d86646 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -20,6 +20,10 @@
 #include <gen_cpp/Metrics_types.h>
 #include <glog/logging.h>
 
+#include <algorithm>
+#include <utility>
+
+#include "common/status.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
@@ -63,6 +67,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* 
state, LocalStateI
     _spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1);
     _recovery_probe_blocks = ADD_COUNTER(profile(), 
"SpillRecoveryProbeBlocks", TUnit::UNIT);
     _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillRecoveryProbeTime", 1);
+    _memory_usage_reserved =
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::UNIT, 1);
 
     _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), 
"ProbeBlocksBytes", TUnit::BYTES, 1);
 
@@ -294,29 +300,21 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
     return Status::OK();
 }
 
-Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
-                                                                           
uint32_t partition_index,
-                                                                           
bool& has_data) {
+Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,
+                                                                          
uint32_t partition_index,
+                                                                          
bool& has_data) {
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
                << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-               << " recovery_build_blocks_from_disk";
+               << " recover_build_blocks_from_disk";
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
         return Status::OK();
     }
 
-    RETURN_IF_ERROR(spilled_stream->spill_eof());
     spilled_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
                                       _spill_read_bytes, 
_spill_read_wait_io_timer);
 
-    auto& mutable_block = 
_shared_state->partitioned_build_blocks[partition_index];
-    if (!mutable_block) {
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
-        spilled_stream.reset();
-        return Status::OK();
-    }
-
     std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
             _shared_state->shared_from_this();
 
@@ -325,8 +323,8 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
&mutable_block,
-                      shared_state_holder, submit_timer, partition_index] {
+    auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
shared_state_holder,
+                      submit_timer, partition_index] {
         auto shared_state_sptr = shared_state_holder.lock();
         if (!shared_state_sptr || state->is_cancelled()) {
             LOG(INFO) << "query: " << print_id(query_id)
@@ -368,25 +366,31 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                 break;
             }
 
-            if (mutable_block->empty()) {
-                *mutable_block = std::move(block);
+            if (!_recovered_build_block) {
+                _recovered_build_block = 
vectorized::MutableBlock::create_unique(std::move(block));
             } else {
-                DCHECK_EQ(mutable_block->columns(), block.columns());
-                st = mutable_block->merge(std::move(block));
+                DCHECK_EQ(_recovered_build_block->columns(), block.columns());
+                st = _recovered_build_block->merge(std::move(block));
                 if (!st.ok()) {
                     _spill_status_ok = false;
                     _spill_status = std::move(st);
                     break;
                 }
             }
+
+            if (_recovered_build_block->allocated_bytes() >=
+                vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+                break;
+            }
         }
 
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
-        shared_state_sptr->spilled_streams[partition_index].reset();
-        const size_t rows = mutable_block ? mutable_block->rows() : 0;
-        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
-                   << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-                   << ", recovery build data done, rows: " << rows;
+        if (eos) {
+            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
+            shared_state_sptr->spilled_streams[partition_index].reset();
+            VLOG_DEBUG << "query: " << print_id(state->query_id())
+                       << ", node: " << _parent->node_id() << ", task id: " << 
state->task_id()
+                       << ", partition: " << partition_index;
+        }
     };
 
     auto exception_catch_func = [read_func, query_id, this]() {
@@ -433,7 +437,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                                                           
exception_catch_func);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
                << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-               << " recovery_build_blocks_from_disk submit func";
+               << " recover_build_blocks_from_disk submit func";
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
@@ -459,9 +463,9 @@ std::string 
PartitionedHashJoinProbeLocalState::debug_string(int indentation_lev
     return fmt::to_string(debug_string_buffer);
 }
 
-Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
 state,
-                                                                           
uint32_t partition_index,
-                                                                           
bool& has_data) {
+Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(RuntimeState*
 state,
+                                                                          
uint32_t partition_index,
+                                                                          
bool& has_data) {
     auto& spilled_stream = _probe_spilling_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -485,18 +489,24 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
             st = Status::Error<INTERNAL_ERROR>(
                     "fault_inject partitioned_hash_join_probe 
recover_probe_blocks failed");
         });
-        if (st.ok()) {
+
+        size_t read_size = 0;
+        while (!eos && !_state->is_cancelled() && st.ok()) {
             st = spilled_stream->read_next_block_sync(&block, &eos);
-        }
-        if (!st.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(st);
-        } else {
-            COUNTER_UPDATE(_recovery_probe_rows, block.rows());
-            COUNTER_UPDATE(_recovery_probe_blocks, 1);
-            blocks.emplace_back(std::move(block));
-        }
+            if (!st.ok()) {
+                _spill_status_ok = false;
+                _spill_status = std::move(st);
+            } else {
+                COUNTER_UPDATE(_recovery_probe_rows, block.rows());
+                COUNTER_UPDATE(_recovery_probe_blocks, 1);
+                read_size += block.allocated_bytes();
+                blocks.emplace_back(std::move(block));
+            }
 
+            if (read_size >= 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+                break;
+            }
+        }
         if (eos) {
             VLOG_DEBUG << "query: " << print_id(query_id)
                        << ", recovery probe data done: " << 
spilled_stream->get_spill_dir();
@@ -739,15 +749,27 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
 
     const auto partition_index = local_state._partition_cursor;
     auto& probe_blocks = local_state._probe_blocks[partition_index];
+
+    if (local_state._recovered_build_block && 
!local_state._recovered_build_block->empty()) {
+        local_state._estimate_memory_usage += 
local_state._recovered_build_block->allocated_bytes();
+        auto& mutable_block = 
local_state._shared_state->partitioned_build_blocks[partition_index];
+        if (!mutable_block) {
+            mutable_block = std::move(local_state._recovered_build_block);
+        } else {
+            
RETURN_IF_ERROR(mutable_block->merge(local_state._recovered_build_block->to_block()));
+            local_state._recovered_build_block.reset();
+        }
+    }
+
     if (local_state._need_to_setup_internal_operators) {
-        *eos = false;
         bool has_data = false;
-        RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk(
+        RETURN_IF_ERROR(local_state.recover_build_blocks_from_disk(
                 state, local_state._partition_cursor, has_data));
         if (has_data) {
             return Status::OK();
         }
 
+        *eos = false;
         RETURN_IF_ERROR(local_state.finish_spilling(partition_index));
         RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
         local_state._need_to_setup_internal_operators = false;
@@ -763,7 +785,7 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
             *eos = false;
             bool has_data = false;
             RETURN_IF_ERROR(
-                    local_state.recovery_probe_blocks_from_disk(state, 
partition_index, has_data));
+                    local_state.recover_probe_blocks_from_disk(state, 
partition_index, has_data));
             if (!has_data) {
                 vectorized::Block block;
                 RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
@@ -916,6 +938,12 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         }
     });
 #endif
+
+    Defer defer([&]() {
+        COUNTER_SET(local_state._memory_usage_reserved,
+                    int64_t(local_state.estimate_memory_usage()));
+    });
+
     if (need_more_input_data(state)) {
         RETURN_IF_ERROR(_child->get_block_after_projects(state, 
local_state._child_block.get(),
                                                          
&local_state._child_eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 1368d25986f..8ee518e45a1 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -50,10 +50,10 @@ public:
     Status spill_probe_blocks(RuntimeState* state,
                               const std::shared_ptr<SpillContext>& 
spill_context = nullptr);
 
-    Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
-                                           bool& has_data);
-    Status recovery_probe_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
-                                           bool& has_data);
+    Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
+                                          bool& has_data);
+    Status recover_probe_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
+                                          bool& has_data);
 
     Status finish_spilling(uint32_t partition_index);
 
@@ -81,6 +81,7 @@ private:
     std::atomic<bool> _spill_status_ok {true};
 
     std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
+    std::unique_ptr<vectorized::MutableBlock> _recovered_build_block;
     std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
 
     std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;
@@ -142,6 +143,7 @@ private:
     RuntimeProfile::Counter* _probe_rows_counter = nullptr;
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
+    RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
 };
 
 class PartitionedHashJoinProbeOperatorX final
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f4f03cdb52f..e9ad27079ca 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -55,6 +55,8 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillPartitionShuffleTime", 1);
     _spill_build_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", 1);
     _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", 
TUnit::UNIT, 1);
+    _memory_usage_reserved =
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
 
     return Status::OK();
 }
@@ -126,6 +128,8 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
                     _shared_state->inner_runtime_state.get());
         }
     }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
     return size_to_reserve;
 }
 
@@ -381,6 +385,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
                               COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
                           }
                       });
+        RETURN_IF_ERROR(_finish_spilling());
         _dependency->set_ready_to_read();
 
         if (spill_context) {
@@ -390,6 +395,15 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     return Status::OK();
 }
 
+Status PartitionedHashJoinSinkLocalState::_finish_spilling() {
+    for (auto& stream : _shared_state->spilled_streams) {
+        if (stream) {
+            RETURN_IF_ERROR(stream->spill_eof());
+        }
+    }
+    return Status::OK();
+}
+
 Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
                                                            vectorized::Block* 
in_block,
                                                            size_t begin, 
size_t end) {
@@ -471,6 +485,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
                                   COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
                               }
                           });
+            _spill_status = _finish_spilling();
             _dependency->set_ready_to_read();
         }
 
@@ -583,7 +598,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                        << node_id() << " sink eos, set_ready_to_read"
                        << ", task id: " << state->task_id() << ", need spill: 
" << need_to_spill;
 
-            if (!need_to_spill) {
+            if (need_to_spill) {
+                return revoke_memory(state, nullptr);
+            } else {
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
                     RETURN_IF_ERROR(_setup_internal_operator(state));
                 }
@@ -602,8 +619,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                            << ", task id: " << state->task_id() << ", nonspill 
build usage: "
                            << _inner_sink_operator->get_memory_usage(
                                       
local_state._shared_state->inner_runtime_state.get());
-            } else {
-                return revoke_memory(state, nullptr);
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index a2d75cf2f9b..ae0d4839443 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -61,6 +61,8 @@ protected:
     Status _revoke_unpartitioned_block(RuntimeState* state,
                                        const std::shared_ptr<SpillContext>& 
spill_context);
 
+    Status _finish_spilling();
+
     friend class PartitionedHashJoinSinkOperatorX;
 
     std::atomic_int _spilling_streams_count {0};
@@ -80,6 +82,7 @@ protected:
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
     RuntimeProfile::Counter* _spill_build_timer = nullptr;
     RuntimeProfile::Counter* _in_mem_rows_counter = nullptr;
+    RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
 };
 
 class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 400903add18..a31508e8940 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -383,7 +383,6 @@ Status PipelineTask::execute(bool* eos) {
                     std::max(sink_reserve_size, 
_state->minimum_operator_memory_required_bytes());
             reserve_size = _root->get_reserve_mem_size(_state) + 
sink_reserve_size;
             _root->reset_reserve_mem_size(_state);
-            DCHECK_EQ(_root->get_reserve_mem_size(_state), 0);
 
             auto workload_group = _state->get_query_ctx()->workload_group();
             if (workload_group && reserve_size > 0) {
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 947476faeb1..768502b5747 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -561,6 +561,19 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
                         query_id, memory_usage, query_ctx->get_mem_limit()));
             }
         } else {
+            if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
+                LOG(INFO) << "query: " << query_id
+                          << ", process limit not exceeded now, resume this 
query"
+                          << ", process memory info: "
+                          << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+                          << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
+                query_ctx->set_memory_sufficient(true);
+                return true;
+            }
+
+            LOG(INFO) << "query: " << query_id << ", process limit exceeded, 
info: "
+                      << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+                      << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
             
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
                     "The query({}) reserved memory failed because process 
limit exceeded, and "
                     "there is no cache now. And could not find task to spill. 
Maybe you should set "
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index ec4b8585db3..86c66985c14 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -37,6 +37,7 @@
 #include "olap/olap_common.h"
 #include "runtime/define_primitive_type.h"
 #include "vec/common/cow.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/pod_array_fwd.h"
 #include "vec/common/string_ref.h"
 #include "vec/common/typeid_cast.h"
@@ -336,14 +337,23 @@ public:
         return 0;
     }
 
-    virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                               size_t max_row_byte_size) const {
+    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                       size_t max_row_byte_size) const {
+        serialize_vec(keys.data(), num_rows, max_row_byte_size);
+    }
+
+    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+                                     const uint8_t* null_map) const {
+        serialize_vec_with_null_map(keys.data(), num_rows, null_map);
+    }
+
+    virtual void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "Method serialize_vec is not supported for " + 
get_name());
         __builtin_unreachable();
     }
 
-    virtual void serialize_vec_with_null_map(std::vector<StringRef>& keys, 
size_t num_rows,
+    virtual void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                              const uint8_t* null_map) const {
         throw doris::Exception(
                 ErrorCode::NOT_IMPLEMENTED_ERROR,
@@ -351,15 +361,24 @@ public:
         __builtin_unreachable();
     }
 
+    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) {
+        deserialize_vec(keys.data(), num_rows);
+    }
+
+    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+                                       const uint8_t* null_map) {
+        deserialize_vec_with_null_map(keys.data(), num_rows, null_map);
+    }
+
     // This function deserializes group-by keys into column in the vectorized 
way.
-    virtual void deserialize_vec(std::vector<StringRef>& keys, const size_t 
num_rows) {
+    virtual void deserialize_vec(StringRef* keys, const size_t num_rows) {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "Method deserialize_vec is not supported for " 
+ get_name());
         __builtin_unreachable();
     }
 
     // Used in ColumnNullable::deserialize_vec
-    virtual void deserialize_vec_with_null_map(std::vector<StringRef>& keys, 
const size_t num_rows,
+    virtual void deserialize_vec_with_null_map(StringRef* keys, const size_t 
num_rows,
                                                const uint8_t* null_map) {
         throw doris::Exception(
                 ErrorCode::NOT_IMPLEMENTED_ERROR,
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 0d1b16161eb..5498ee25a6e 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -176,8 +176,7 @@ public:
 
     size_t get_max_row_byte_size() const override { return 
data->get_max_row_byte_size(); }
 
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override {
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override {
         data->serialize_vec(keys, num_rows, max_row_byte_size);
     }
 
@@ -196,7 +195,7 @@ public:
         get_data_column_ptr()->update_crc_with_value(start, end, hash, 
nullptr);
     }
 
-    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+    void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                      const uint8_t* null_map) const override {
         data->serialize_vec_with_null_map(keys, num_rows, null_map);
     }
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 2e5fc5e136a..9af67dac5ec 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -74,7 +74,7 @@ size_t ColumnDecimal<T>::get_max_row_byte_size() const {
 }
 
 template <typename T>
-void ColumnDecimal<T>::serialize_vec(std::vector<StringRef>& keys, size_t 
num_rows,
+void ColumnDecimal<T>::serialize_vec(StringRef* keys, size_t num_rows,
                                      size_t max_row_byte_size) const {
     for (size_t i = 0; i < num_rows; ++i) {
         memcpy_fixed<T>(const_cast<char*>(keys[i].data + keys[i].size), 
(char*)&data[i]);
@@ -83,7 +83,7 @@ void ColumnDecimal<T>::serialize_vec(std::vector<StringRef>& 
keys, size_t num_ro
 }
 
 template <typename T>
-void ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& 
keys, size_t num_rows,
+void ColumnDecimal<T>::serialize_vec_with_null_map(StringRef* keys, size_t 
num_rows,
                                                    const UInt8* null_map) 
const {
     DCHECK(null_map != nullptr);
     const bool has_null = simd::contain_byte(null_map, num_rows, 1);
@@ -111,7 +111,7 @@ void 
ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
 }
 
 template <typename T>
-void ColumnDecimal<T>::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+void ColumnDecimal<T>::deserialize_vec(StringRef* keys, const size_t num_rows) 
{
     for (size_t i = 0; i < num_rows; ++i) {
         keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
         keys[i].size -= sizeof(T);
@@ -119,8 +119,7 @@ void 
ColumnDecimal<T>::deserialize_vec(std::vector<StringRef>& keys, const size_
 }
 
 template <typename T>
-void ColumnDecimal<T>::deserialize_vec_with_null_map(std::vector<StringRef>& 
keys,
-                                                     const size_t num_rows,
+void ColumnDecimal<T>::deserialize_vec_with_null_map(StringRef* keys, const 
size_t num_rows,
                                                      const uint8_t* null_map) {
     for (size_t i = 0; i < num_rows; ++i) {
         if (null_map[i] == 0) {
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 0927cb88e15..30561005e04 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -165,15 +165,14 @@ public:
 
     size_t get_max_row_byte_size() const override;
 
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override;
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override;
 
-    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+    void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                      const uint8_t* null_map) const override;
 
-    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+    void deserialize_vec(StringRef* keys, const size_t num_rows) override;
 
-    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+    void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows,
                                        const uint8_t* null_map) override;
 
     void update_hash_with_value(size_t n, SipHash& hash) const override;
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index dbee5a2025a..38ca32ce291 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -246,13 +246,13 @@ size_t ColumnNullable::get_max_row_byte_size() const {
     return flag_size + get_nested_column().get_max_row_byte_size();
 }
 
-void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t 
num_rows,
+void ColumnNullable::serialize_vec(StringRef* keys, size_t num_rows,
                                    size_t max_row_byte_size) const {
     const auto& arr = get_null_map_data();
     get_nested_column().serialize_vec_with_null_map(keys, num_rows, 
arr.data());
 }
 
-void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+void ColumnNullable::deserialize_vec(StringRef* keys, const size_t num_rows) {
     auto& arr = get_null_map_data();
     const size_t old_size = arr.size();
     arr.resize(old_size + num_rows);
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 7772b6e80ad..c39036c2e3a 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -180,10 +180,10 @@ public:
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     size_t get_max_row_byte_size() const override;
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override;
 
-    void deserialize_vec(std::vector<StringRef>& keys, size_t num_rows) 
override;
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override;
+
+    void deserialize_vec(StringRef* keys, size_t num_rows) override;
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 96a27e44e92..6e3c754fc32 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -518,24 +518,23 @@ public:
                                "get_max_row_byte_size" + 
std::string(get_family_name()));
     }
 
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override {
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "serialize_vec" + 
std::string(get_family_name()));
     }
 
-    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+    void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                      const uint8_t* null_map) const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "serialize_vec_with_null_map" + 
std::string(get_family_name()));
     }
 
-    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override {
+    void deserialize_vec(StringRef* keys, const size_t num_rows) override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "deserialize_vec" + 
std::string(get_family_name()));
     }
 
-    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+    void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows,
                                        const uint8_t* null_map) override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "deserialize_vec_with_null_map" + 
std::string(get_family_name()));
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index d8fd42e36c7..a6cf116b6db 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -383,8 +383,7 @@ size_t ColumnStr<T>::get_max_row_byte_size() const {
 }
 
 template <typename T>
-void ColumnStr<T>::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                                 size_t max_row_byte_size) const {
+void ColumnStr<T>::serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const {
     for (size_t i = 0; i < num_rows; ++i) {
         uint32_t offset(offset_at(i));
         uint32_t string_size(size_at(i));
@@ -397,7 +396,7 @@ void ColumnStr<T>::serialize_vec(std::vector<StringRef>& 
keys, size_t num_rows,
 }
 
 template <typename T>
-void ColumnStr<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys, 
size_t num_rows,
+void ColumnStr<T>::serialize_vec_with_null_map(StringRef* keys, size_t 
num_rows,
                                                const UInt8* null_map) const {
     DCHECK(null_map != nullptr);
 
@@ -438,7 +437,7 @@ void 
ColumnStr<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys, siz
 }
 
 template <typename T>
-void ColumnStr<T>::deserialize_vec(std::vector<StringRef>& keys, const size_t 
num_rows) {
+void ColumnStr<T>::deserialize_vec(StringRef* keys, const size_t num_rows) {
     for (size_t i = 0; i != num_rows; ++i) {
         auto original_ptr = keys[i].data;
         keys[i].data = deserialize_and_insert_from_arena(original_ptr);
@@ -447,8 +446,8 @@ void ColumnStr<T>::deserialize_vec(std::vector<StringRef>& 
keys, const size_t nu
 }
 
 template <typename T>
-void ColumnStr<T>::deserialize_vec_with_null_map(std::vector<StringRef>& keys,
-                                                 const size_t num_rows, const 
uint8_t* null_map) {
+void ColumnStr<T>::deserialize_vec_with_null_map(StringRef* keys, const size_t 
num_rows,
+                                                 const uint8_t* null_map) {
     for (size_t i = 0; i != num_rows; ++i) {
         if (null_map[i] == 0) {
             auto original_ptr = keys[i].data;
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index b441b81613b..9f3771bbe96 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -387,17 +387,16 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
-    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+    void deserialize_vec(StringRef* keys, const size_t num_rows) override;
 
     size_t get_max_row_byte_size() const override;
 
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override;
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override;
 
-    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+    void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                      const uint8_t* null_map) const override;
 
-    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+    void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows,
                                        const uint8_t* null_map) override;
 
     void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 300152b5784..4382fa62851 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -63,7 +63,7 @@ size_t ColumnVector<T>::get_max_row_byte_size() const {
 }
 
 template <typename T>
-void ColumnVector<T>::serialize_vec(std::vector<StringRef>& keys, size_t 
num_rows,
+void ColumnVector<T>::serialize_vec(StringRef* keys, size_t num_rows,
                                     size_t max_row_byte_size) const {
     for (size_t i = 0; i < num_rows; ++i) {
         memcpy_fixed<T>(const_cast<char*>(keys[i].data + keys[i].size), 
(char*)&data[i]);
@@ -72,7 +72,7 @@ void ColumnVector<T>::serialize_vec(std::vector<StringRef>& 
keys, size_t num_row
 }
 
 template <typename T>
-void ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& 
keys, size_t num_rows,
+void ColumnVector<T>::serialize_vec_with_null_map(StringRef* keys, size_t 
num_rows,
                                                   const UInt8* null_map) const 
{
     DCHECK(null_map != nullptr);
 
@@ -102,7 +102,7 @@ void 
ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
 }
 
 template <typename T>
-void ColumnVector<T>::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+void ColumnVector<T>::deserialize_vec(StringRef* keys, const size_t num_rows) {
     for (size_t i = 0; i != num_rows; ++i) {
         keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
         keys[i].size -= sizeof(T);
@@ -110,8 +110,7 @@ void 
ColumnVector<T>::deserialize_vec(std::vector<StringRef>& keys, const size_t
 }
 
 template <typename T>
-void ColumnVector<T>::deserialize_vec_with_null_map(std::vector<StringRef>& 
keys,
-                                                    const size_t num_rows,
+void ColumnVector<T>::deserialize_vec_with_null_map(StringRef* keys, const 
size_t num_rows,
                                                     const uint8_t* null_map) {
     for (size_t i = 0; i < num_rows; ++i) {
         if (null_map[i] == 0) {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 015595797b9..a6e329b9522 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -247,17 +247,16 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
-    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+    void deserialize_vec(StringRef* keys, const size_t num_rows) override;
 
-    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+    void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows,
                                        const uint8_t* null_map) override;
 
     size_t get_max_row_byte_size() const override;
 
-    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
-                       size_t max_row_byte_size) const override;
+    void serialize_vec(StringRef* keys, size_t num_rows, size_t 
max_row_byte_size) const override;
 
-    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+    void serialize_vec_with_null_map(StringRef* keys, size_t num_rows,
                                      const uint8_t* null_map) const override;
 
     void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
diff --git a/be/src/vec/common/custom_allocator.h 
b/be/src/vec/common/custom_allocator.h
index eee800a059d..6361a60689c 100644
--- a/be/src/vec/common/custom_allocator.h
+++ b/be/src/vec/common/custom_allocator.h
@@ -20,7 +20,7 @@
 #include "vec/common/allocator.h"
 #include "vec/common/allocator_fwd.h"
 
-template <class T, typename MemoryAllocator = Allocator<true>>
+template <class T, typename MemoryAllocator = Allocator<false>>
 class CustomStdAllocator;
 
 template <typename T>
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 0df0c8997f0..2d0b46150b1 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -56,10 +56,10 @@ struct MethodBaseInner {
     bool inited_iterator = false;
     Key* keys = nullptr;
     Arena arena;
-    std::vector<size_t> hash_values;
+    DorisVector<size_t> hash_values;
 
     // use in join case
-    std::vector<uint32_t> bucket_nums;
+    DorisVector<uint32_t> bucket_nums;
 
     MethodBaseInner() { hash_table.reset(new HashMap()); }
     virtual ~MethodBaseInner() = default;
@@ -198,10 +198,10 @@ struct MethodSerialized : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, 
typename Base::Mapped>;
     using Base::try_presis_key;
     // need keep until the hash probe end.
-    std::vector<StringRef> build_stored_keys;
+    DorisVector<StringRef> build_stored_keys;
     Arena build_arena;
     // refresh each time probe
-    std::vector<StringRef> stored_keys;
+    DorisVector<StringRef> stored_keys;
 
     StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
                                                 const ColumnRawPtrs& 
key_columns, Arena& pool) {
@@ -216,7 +216,7 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 
     void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                                   std::vector<StringRef>& input_keys, Arena& 
input_arena) {
+                                   DorisVector<StringRef>& input_keys, Arena& 
input_arena) {
         input_arena.clear();
         input_keys.resize(num_rows);
 
@@ -243,14 +243,18 @@ struct MethodSerialized : public MethodBase<TData> {
             }
 
             for (const auto& column : key_columns) {
-                column->serialize_vec(input_keys, num_rows, 
max_one_row_byte_size);
+                column->serialize_vec(input_keys.data(), num_rows, 
max_one_row_byte_size);
             }
         }
         Base::keys = input_keys.data();
     }
 
     size_t serialized_keys_size(bool is_build) const override {
-        return is_build ? build_arena.size() : Base::arena.size();
+        if (is_build) {
+            return build_stored_keys.size() * sizeof(StringRef) + 
build_arena.size();
+        } else {
+            return stored_keys.size() * sizeof(StringRef) + Base::arena.size();
+        }
     }
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
@@ -286,9 +290,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
             ColumnsHashing::HashMethodString<typename Base::Value, typename 
Base::Mapped, true>;
 
     // need keep until the hash probe end.
-    std::vector<StringRef> _build_stored_keys;
+    DorisVector<StringRef> _build_stored_keys;
     // refresh each time probe
-    std::vector<StringRef> _stored_keys;
+    DorisVector<StringRef> _stored_keys;
 
     size_t serialized_keys_size(bool is_build) const override {
         return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
@@ -296,13 +300,13 @@ struct MethodStringNoCache : public MethodBase<TData> {
     }
 
     void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                                   std::vector<StringRef>& stored_keys) {
+                                   DorisVector<StringRef>& stored_keys) {
         const IColumn& column = *key_columns[0];
         const auto& nested_column =
                 column.is_nullable()
                         ? assert_cast<const 
ColumnNullable&>(column).get_nested_column()
                         : column;
-        auto serialized_str = [](const auto& column_string, 
std::vector<StringRef>& stored_keys) {
+        auto serialized_str = [](const auto& column_string, 
DorisVector<StringRef>& stored_keys) {
             const auto& offsets = column_string.get_offsets();
             const auto* chars = column_string.get_chars().data();
             stored_keys.resize(column_string.size());
@@ -388,16 +392,16 @@ struct MethodKeysFixed : public MethodBase<TData> {
                                                       has_nullable_keys>;
 
     // need keep until the hash probe end. use only in join
-    std::vector<Key> build_stored_keys;
+    DorisVector<Key> build_stored_keys;
     // refresh each time probe hash table
-    std::vector<Key> stored_keys;
+    DorisVector<Key> stored_keys;
     Sizes key_sizes;
 
     MethodKeysFixed(Sizes key_sizes_) : key_sizes(std::move(key_sizes_)) {}
 
     template <typename T>
     void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns,
-                     const ColumnRawPtrs& nullmap_columns, std::vector<T>& 
result) {
+                     const ColumnRawPtrs& nullmap_columns, DorisVector<T>& 
result) {
         size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
         // set size to 0 at first, then use resize to call default constructor 
on index included from [0, row_numbers) to reset all memory
         result.clear();
diff --git a/be/src/vec/common/hash_table/join_hash_table.h 
b/be/src/vec/common/hash_table/join_hash_table.h
index 317987541cd..d95512e275c 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -22,6 +22,7 @@
 #include <limits>
 
 #include "vec/columns/column_filter_helper.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -69,7 +70,7 @@ public:
 
     size_t size() const { return next.size(); }
 
-    std::vector<uint8_t>& get_visited() { return visited; }
+    DorisVector<uint8_t>& get_visited() { return visited; }
 
     template <int JoinOpType, bool with_other_conjuncts>
     void build(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
@@ -255,7 +256,7 @@ public:
 
     bool has_null_key() { return _has_null_key; }
 
-    void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map) 
const {
+    void pre_build_idxs(DorisVector<uint32>& buckets, const uint8_t* null_map) 
const {
         if (null_map) {
             for (unsigned int& bucket : buckets) {
                 bucket = bucket == bucket_size ? bucket_size : first[bucket];
@@ -454,13 +455,13 @@ private:
     }
 
     const Key* __restrict build_keys;
-    std::vector<uint8_t> visited;
+    DorisVector<uint8_t> visited;
 
     uint32_t bucket_size = 1;
     int max_batch_size = 4064;
 
-    std::vector<uint32_t> first = {0};
-    std::vector<uint32_t> next = {0};
+    DorisVector<uint32_t> first = {0};
+    DorisVector<uint32_t> next = {0};
 
     // use in iter hash map
     mutable uint32_t iter_idx = 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to