This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ca5bf9842abb014c089b4d212932aa09fd2d24ce
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sat Aug 24 21:49:42 2024 +0800

    [feat]reserve and spill
---
 be/src/clucene                                     |   2 +-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  19 +++
 be/src/pipeline/exec/aggregation_sink_operator.h   |   6 +
 .../pipeline/exec/aggregation_source_operator.cpp  |  43 ++---
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  10 ++
 be/src/pipeline/exec/analytic_sink_operator.h      |   4 +
 be/src/pipeline/exec/analytic_source_operator.cpp  |   4 +-
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |   4 +-
 be/src/pipeline/exec/datagen_operator.cpp          |   4 +-
 .../distinct_streaming_aggregation_operator.cpp    |   3 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |   3 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  39 +++++
 be/src/pipeline/exec/hashjoin_build_sink.h         |   6 +
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  14 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   2 +
 .../exec/multi_cast_data_stream_source.cpp         |   4 +-
 .../exec/nested_loop_join_probe_operator.cpp       |   8 +-
 be/src/pipeline/exec/operator.cpp                  |  22 ++-
 be/src/pipeline/exec/operator.h                    |  79 ++++++++-
 .../pipeline/exec/partition_sort_sink_operator.cpp |  13 ++
 .../pipeline/exec/partition_sort_sink_operator.h   |   2 +
 .../exec/partition_sort_source_operator.cpp        |  14 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  24 ++-
 .../exec/partitioned_aggregation_sink_operator.h   |   5 +
 .../exec/partitioned_hash_join_probe_operator.cpp  |  22 +--
 .../exec/partitioned_hash_join_probe_operator.h    |   3 +
 .../exec/partitioned_hash_join_sink_operator.cpp   |  90 ++++++++---
 .../exec/partitioned_hash_join_sink_operator.h     |   4 +
 be/src/pipeline/exec/repeat_operator.cpp           |   9 +-
 be/src/pipeline/exec/schema_scan_operator.cpp      |   4 +-
 be/src/pipeline/exec/select_operator.h             |   3 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |   7 +
 be/src/pipeline/exec/set_probe_sink_operator.h     |   4 +
 be/src/pipeline/exec/set_sink_operator.cpp         |  27 ++++
 be/src/pipeline/exec/set_sink_operator.h           |   2 +
 be/src/pipeline/exec/set_source_operator.cpp       |   5 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |   2 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  10 +-
 .../exec/streaming_aggregation_operator.cpp        |   6 +-
 be/src/pipeline/exec/table_function_operator.cpp   |   3 +-
 be/src/pipeline/exec/union_source_operator.cpp     |   3 +
 be/src/pipeline/pipeline_fragment_context.cpp      |  36 +++++
 be/src/pipeline/pipeline_fragment_context.h        |   4 +
 be/src/pipeline/pipeline_task.cpp                  |  81 +++++++++-
 be/src/pipeline/pipeline_task.h                    |  26 ++-
 be/src/pipeline/task_scheduler.cpp                 | 177 ++++++++++++++++++++-
 be/src/pipeline/task_scheduler.h                   |  17 +-
 be/src/runtime/query_context.cpp                   |  68 +++++++-
 be/src/runtime/query_context.h                     |  26 ++-
 be/src/runtime/runtime_state.h                     |   9 +-
 be/src/vec/common/hash_table/hash_table.h          |  12 ++
 be/src/vec/common/hash_table/ph_hash_map.h         |   9 ++
 be/src/vec/common/hash_table/string_hash_table.h   |  26 +++
 be/src/vec/exprs/vectorized_fn_call.cpp            |  18 +++
 be/src/vec/exprs/vectorized_fn_call.h              |   2 +
 be/src/vec/exprs/vexpr.cpp                         |  18 +++
 be/src/vec/exprs/vexpr.h                           |   2 +
 be/src/vec/exprs/vexpr_context.cpp                 |  46 +++++-
 be/src/vec/exprs/vexpr_context.h                   |  14 ++
 be/src/vec/exprs/vin_predicate.cpp                 |  20 +++
 be/src/vec/exprs/vin_predicate.h                   |   1 +
 be/src/vec/exprs/vslot_ref.h                       |   2 +
 62 files changed, 1011 insertions(+), 141 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index c5d02a7e411..fdbf2204031 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit c5d02a7e41194b02444be6d684e3aeb4ff1b5595
+Subproject commit fdbf2204031128b2bd8505fc73c06403b7c1a815
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 260a599a947..51734bbd5ee 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "runtime/primitive_type.h"
+#include "runtime/thread_context.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 
@@ -176,6 +177,8 @@ Status 
AggSinkLocalState::_create_agg_status(vectorized::AggregateDataPtr data)
 Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
     SCOPED_TIMER(_build_timer);
+    _memory_usage_last_executing = 0;
+    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
                 block,
@@ -187,6 +190,8 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
 }
 
 Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) 
{
+    _memory_usage_last_executing = 0;
+    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _merge_with_serialized_key_helper<true, false>(block);
     } else {
@@ -394,6 +399,9 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
 Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
     SCOPED_TIMER(_merge_timer);
     DCHECK(_agg_data->without_key != nullptr);
+
+    _memory_usage_last_executing = 0;
+    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
             int col_id = AggSharedState::get_slot_column_id(
@@ -431,6 +439,8 @@ void AggSinkLocalState::_update_memusage_without_key() {
 }
 
 Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* 
block) {
+    _memory_usage_last_executing = 0;
+    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _execute_with_serialized_key_helper<true>(block);
     } else {
@@ -708,6 +718,10 @@ Status AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs&
     return Status::OK();
 }
 
+size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const {
+    return _memory_usage();
+}
+
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
                                    const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
@@ -864,6 +878,11 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* 
state) {
     return Status::OK();
 }
 
+size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state.get_reserve_mem_size(state);
+}
+
 Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_close_timer);
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 97440de3f09..129aea5eb76 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -96,6 +96,8 @@ protected:
     Status _create_agg_status(vectorized::AggregateDataPtr data);
     size_t _memory_usage() const;
 
+    size_t get_reserve_mem_size(RuntimeState* state) const;
+
     RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr;
@@ -123,6 +125,8 @@ protected:
     std::unique_ptr<vectorized::Arena> _agg_profile_arena;
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
+
+    size_t _memory_usage_last_executing = 0;
 };
 
 class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
@@ -163,6 +167,8 @@ public:
 
     Status reset_hash_table(RuntimeState* state);
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
     using DataSinkOperatorX<AggSinkLocalState>::node_id;
     using DataSinkOperatorX<AggSinkLocalState>::operator_id;
     using DataSinkOperatorX<AggSinkLocalState>::get_local_state;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index a5f40a431c5..c1cb187f3e6 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -22,17 +22,13 @@
 
 #include "common/exception.h"
 #include "pipeline/exec/operator.h"
+#include "runtime/thread_context.h"
 #include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vexpr_fwd.h"
 
 namespace doris::pipeline {
 
-AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
-        : Base(state, parent),
-          _get_results_timer(nullptr),
-          _serialize_result_timer(nullptr),
-          _hash_table_iterate_timer(nullptr),
-          _insert_keys_to_column_timer(nullptr),
-          _serialize_data_timer(nullptr) {}
+AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {}
 
 Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -53,23 +49,27 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._without_key) {
         if (p._needs_finalize) {
-            _executor.get_result = 
std::bind<Status>(&AggLocalState::_get_without_key_result, this,
-                                                     std::placeholders::_1, 
std::placeholders::_2,
-                                                     std::placeholders::_3);
+            _executor.get_result = [this](RuntimeState* state, 
vectorized::Block* block,
+                                          bool* eos) {
+                return _get_without_key_result(state, block, eos);
+            };
         } else {
-            _executor.get_result = 
std::bind<Status>(&AggLocalState::_serialize_without_key, this,
-                                                     std::placeholders::_1, 
std::placeholders::_2,
-                                                     std::placeholders::_3);
+            _executor.get_result = [this](RuntimeState* state, 
vectorized::Block* block,
+                                          bool* eos) {
+                return _serialize_without_key(state, block, eos);
+            };
         }
     } else {
         if (p._needs_finalize) {
-            _executor.get_result = std::bind<Status>(
-                    &AggLocalState::_get_with_serialized_key_result, this, 
std::placeholders::_1,
-                    std::placeholders::_2, std::placeholders::_3);
+            _executor.get_result = [this](RuntimeState* state, 
vectorized::Block* block,
+                                          bool* eos) {
+                return _get_with_serialized_key_result(state, block, eos);
+            };
         } else {
-            _executor.get_result = std::bind<Status>(
-                    &AggLocalState::_serialize_with_serialized_key_result, 
this,
-                    std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3);
+            _executor.get_result = [this](RuntimeState* state, 
vectorized::Block* block,
+                                          bool* eos) {
+                return _serialize_with_serialized_key_result(state, block, 
eos);
+            };
         }
     }
 
@@ -440,11 +440,11 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage);
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
     local_state.make_nullable_output_key(block);
     // dispose the having clause, should not be execute in prestreaming agg
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                           block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.do_agg_limit(block, eos);
     return Status::OK();
 }
@@ -482,6 +482,7 @@ void 
AggLocalState::make_nullable_output_key(vectorized::Block* block) {
 template <bool limit>
 Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* 
block) {
     SCOPED_TIMER(_merge_timer);
+    ScopedMemTracker scoped_tracker(_estimate_memory_usage);
 
     size_t key_size = Base::_shared_state->probe_expr_ctxs.size();
     vectorized::ColumnRawPtrs key_columns(key_size);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 85d7773bdbd..6e3aeec283e 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -21,6 +21,7 @@
 #include <string>
 
 #include "pipeline/exec/operator.h"
+#include "runtime/runtime_state.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris::pipeline {
@@ -263,6 +264,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+
+    local_state._reserve_mem_size = 0;
+    ScopedMemTracker mem_tracker(local_state._reserve_mem_size);
+
     local_state._shared_state->input_eos = eos;
     if (local_state._shared_state->input_eos && input_block->rows() == 0) {
         local_state._dependency->set_ready_to_read();
@@ -325,6 +330,11 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     return Status::OK();
 }
 
+size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state._reserve_mem_size;
+}
+
 Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
                                                    const 
vectorized::VExprContextSPtr& expr,
                                                    vectorized::IColumn* 
dst_column, size_t length) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index b8615717198..a82a1e41044 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -61,6 +61,8 @@ private:
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
 
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
+
+    size_t _reserve_mem_size = 0;
 };
 
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
@@ -93,6 +95,8 @@ public:
         return !_partition_by_eq_expr_ctxs.empty() && 
_order_by_eq_expr_ctxs.empty();
     }
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
 private:
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
                                 vectorized::IColumn* dst_column, size_t 
length);
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index b521a9b583f..2661314592d 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -512,6 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
                                           bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
     if (local_state._shared_state->input_eos &&
         (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
          local_state._shared_state->input_total_rows == 0)) {
@@ -539,8 +540,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
         }
     }
     RETURN_IF_ERROR(local_state.output_current_block(block));
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                           block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.reached_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 5aa27b51c45..6c6a28029e2 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -42,6 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                     bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
     local_state.add_num_rows_returned(block->rows());
     int64_t num_rows_returned = local_state.num_rows_returned();
     bool assert_res = false;
@@ -116,8 +117,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     }
     COUNTER_SET(local_state.rows_returned_counter(), 
local_state.num_rows_returned());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                           block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 93b3d058154..03fe9db37bd 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -68,9 +68,9 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block*
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
     Status res = local_state._table_func->get_next(state, block, eos);
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                           block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.reached_limit(block, eos);
     return res;
 }
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 70b73225f06..29975be396a 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -456,8 +456,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, vectorized::Bloc
     local_state._make_nullable_output_key(block);
     if (!_is_streaming_preagg) {
         // dispose the having clause, should not be execute in prestreaming agg
-        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                               
block->columns()));
+        RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
block, block->columns()));
     }
     local_state.add_num_rows_returned(block->rows());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index cf2055ec47b..9db0bca0c43 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -148,8 +148,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
         return Status::OK();
     }
     auto status = local_state.stream_recvr->get_next(block, eos);
-    
RETURN_IF_ERROR(doris::vectorized::VExprContext::filter_block(local_state.conjuncts(),
 block,
-                                                                  
block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, 
block->columns()));
     // In vsortrunmerger, it will set eos=true, and block not empty
     // so that eos==true, could not make sure that block not have valid data
     if (!*eos || block->rows() > 0) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8f7b176a979..589af5e1f42 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -110,6 +110,37 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
+    if (!_should_build_hash_table) {
+        return 0;
+    }
+    size_t size_to_reserve = 0;
+
+    if (!_build_side_mutable_block.empty()) {
+        size_to_reserve += _build_side_mutable_block.allocated_bytes();
+
+        // estimating for serialized key
+        for (auto id : _build_col_ids) {
+            size_to_reserve += 
_build_side_mutable_block.get_column_by_position(id)->byte_size();
+        }
+    }
+
+    const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
+    size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
+
+    size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
+    size_to_reserve += rows * sizeof(uint32_t);        // JoinHashTable::next
+
+    auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+    if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN ||
+        p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) {
+        size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
+    }
+    size_to_reserve += _evaluate_mem_usage;
+
+    return size_to_reserve;
+}
+
 Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
     Defer defer {[&]() {
@@ -170,6 +201,7 @@ Status 
HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block,
                                                  
vectorized::VExprContextSPtrs& exprs,
                                                  RuntimeProfile::Counter& 
expr_call_timer,
                                                  std::vector<int>& 
res_col_ids) {
+    auto origin_size = block.allocated_bytes();
     for (size_t i = 0; i < exprs.size(); ++i) {
         int result_col_id = -1;
         // execute build column
@@ -183,6 +215,8 @@ Status 
HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block,
                 
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
         res_col_ids[i] = result_col_id;
     }
+
+    _evaluate_mem_usage = block.allocated_bytes() - origin_size;
     return Status::OK();
 }
 
@@ -621,4 +655,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     return Status::OK();
 }
 
+size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state.get_reserve_mem_size(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index cf677833fb5..cb626a81cb1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,6 +54,8 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
+
 protected:
     void _hash_table_init(RuntimeState* state);
     void _set_build_ignore_flag(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
@@ -77,6 +79,8 @@ protected:
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
 
+    size_t _evaluate_mem_usage = 0;
+
     size_t _build_side_rows = 0;
 
     vectorized::MutableBlock _build_side_mutable_block;
@@ -122,6 +126,8 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
     bool should_dry_run(RuntimeState* state) override {
         return _is_broadcast_join && !state->get_sink_local_state()
                                               
->cast<HashJoinBuildSinkLocalState>()
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index f91e1eaa2a1..44d51c2416b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -347,6 +347,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
         return st;
     }
 
+    local_state._estimate_memory_usage += temp_block.allocated_bytes();
     RETURN_IF_ERROR(
             local_state.filter_data_and_build_output(state, output_block, eos, 
&temp_block));
     // Here make _join_block release the columns' ptr
@@ -445,8 +446,7 @@ Status 
HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
     }
     {
         SCOPED_TIMER(_join_filter_timer);
-        RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
temp_block,
-                                                               
temp_block->columns()));
+        RETURN_IF_ERROR(filter_block(_conjuncts, temp_block, 
temp_block->columns()));
     }
 
     RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
@@ -487,8 +487,12 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
     auto& local_state = get_local_state(state);
     local_state.prepare_for_next();
     local_state._probe_eos = eos;
-    if (input_block->rows() > 0) {
-        COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows());
+
+    const auto rows = input_block->rows();
+    size_t origin_size = input_block->allocated_bytes();
+
+    if (rows > 0) {
+        COUNTER_UPDATE(local_state._probe_rows_counter, rows);
         std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
         RETURN_IF_ERROR(_do_evaluate(*input_block, 
local_state._probe_expr_ctxs,
                                      *local_state._probe_expr_call_timer, 
res_col_ids));
@@ -499,6 +503,8 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
 
         RETURN_IF_ERROR(local_state._extract_join_column(*input_block, 
res_col_ids));
 
+        local_state._estimate_memory_usage += (input_block->allocated_bytes() 
- origin_size);
+
         if (&local_state._probe_block != input_block) {
             input_block->swap(local_state._probe_block);
         }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d3bca8fa7cd..e03fcf9dbdb 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -116,6 +116,8 @@ private:
     std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
             std::make_unique<HashTableCtxVariants>();
 
+    ssize_t _estimated_mem_in_push = -1;
+
     RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
     RuntimeProfile::Counter* _probe_next_timer = nullptr;
     RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 1028bca7ce2..6dce8e60874 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -90,8 +90,8 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                                               
output_block, eos));
 
     if (!local_state._conjuncts.empty()) {
-        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
-                                                               
output_block->columns()));
+        RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
output_block,
+                                                 output_block->columns()));
     }
 
     if (!local_state._output_expr_contexts.empty() && output_block->rows() > 
0) {
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 9546ed8df56..bb53755edb2 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -466,6 +466,7 @@ Status 
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
                                           bool eos) const {
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
+    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
     local_state._cur_probe_row_visited_flags.resize(block->rows());
     std::fill(local_state._cur_probe_row_visited_flags.begin(),
               local_state._cur_probe_row_visited_flags.end(), 0);
@@ -492,10 +493,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                           bool* eos) const {
     auto& local_state = get_local_state(state);
     if (_is_output_left_side_only) {
+        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
         
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), 
block));
         *eos = local_state._shared_state->left_side_eos;
         local_state._need_more_input_data = 
!local_state._shared_state->left_side_eos;
     } else {
+        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
         *eos = ((_match_all_build || _is_right_semi_anti)
                         ? local_state._output_null_idx_build_side ==
                                           
local_state._shared_state->build_blocks.size() &&
@@ -511,8 +514,8 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
             local_state.add_tuple_is_null_column(&tmp_block);
             {
                 SCOPED_TIMER(local_state._join_filter_timer);
-                RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
-                        local_state._conjuncts, &tmp_block, 
tmp_block.columns()));
+                
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, &tmp_block,
+                                                         tmp_block.columns()));
             }
             RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, 
false));
             local_state._reset_tuple_is_null_column();
@@ -528,6 +531,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                 state, join_op_variants);
             };
             SCOPED_TIMER(local_state._loop_join_timer);
+            ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
             RETURN_IF_ERROR(std::visit(
                     func, local_state._shared_state->join_op_variants,
                     vectorized::make_bool_variant(_match_all_build || 
_is_right_semi_anti),
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 4a93bac67fe..cf1e82f57dd 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -242,6 +242,14 @@ void PipelineXLocalStateBase::clear_origin_block() {
     
_origin_block.clear_column_data(_parent->intermediate_row_desc().num_materialized_slots());
 }
 
+Status PipelineXLocalStateBase::filter_block(const 
vectorized::VExprContextSPtrs& expr_contexts,
+                                             vectorized::Block* block, int 
column_to_keep) {
+    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, 
block, column_to_keep));
+
+    _estimate_memory_usage += 
vectorized::VExprContext::get_memory_usage(expr_contexts);
+    return Status::OK();
+}
+
 Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* 
origin_block,
                                      vectorized::Block* output_block) const {
     auto* local_state = state->get_local_state(operator_id());
@@ -254,11 +262,14 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
     vectorized::Block input_block = *origin_block;
 
     std::vector<int> result_column_ids;
+    size_t bytes_usage = 0;
     for (const auto& projections : local_state->_intermediate_projections) {
         result_column_ids.resize(projections.size());
         for (int i = 0; i < projections.size(); i++) {
             RETURN_IF_ERROR(projections[i]->execute(&input_block, 
&result_column_ids[i]));
         }
+
+        bytes_usage += input_block.allocated_bytes();
         input_block.shuffle_columns(result_column_ids);
     }
 
@@ -269,12 +280,14 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
                 auto& null_column = 
reinterpret_cast<vectorized::ColumnNullable&>(*to);
                 null_column.get_nested_column().insert_range_from(*from, 0, 
rows);
                 null_column.get_null_map_column().get_data().resize_fill(rows, 
0);
+                bytes_usage += null_column.allocated_bytes();
             } else {
                 to = make_nullable(from, false)->assume_mutable();
             }
         } else {
             if (_keep_origin || !from->is_exclusive()) {
                 to->insert_range_from(*from, 0, rows);
+                bytes_usage += from->allocated_bytes();
             } else {
                 to = from->assume_mutable();
             }
@@ -287,18 +300,24 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
                                                                        
*_output_row_descriptor);
     if (rows != 0) {
         auto& mutable_columns = mutable_block.mutable_columns();
+        const size_t origin_columns_count = input_block.columns();
         DCHECK(mutable_columns.size() == local_state->_projections.size());
         for (int i = 0; i < mutable_columns.size(); ++i) {
             auto result_column_id = -1;
             
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, 
&result_column_id));
             auto column_ptr = input_block.get_by_position(result_column_id)
                                       
.column->convert_to_full_column_if_const();
+            if (result_column_id >= origin_columns_count) {
+                bytes_usage += column_ptr->allocated_bytes();
+            }
             insert_column_datas(mutable_columns[i], column_ptr, rows);
         }
         DCHECK(mutable_block.rows() == rows);
         output_block->set_columns(std::move(mutable_columns));
     }
 
+    local_state->_estimate_memory_usage += bytes_usage;
+
     return Status::OK();
 }
 
@@ -322,6 +341,7 @@ Status 
OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
         if (UNLIKELY(!status.ok())) {
             return status;
         }
+
         return do_projections(state, &local_state->_origin_block, block);
     }
     
local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption());
@@ -365,7 +385,7 @@ std::string 
DataSinkOperatorXBase::debug_string(RuntimeState* state, int indenta
 
 Status DataSinkOperatorXBase::init(const TDataSink& tsink) {
     std::string op_name = "UNKNOWN_SINK";
-    std::map<int, const char*>::const_iterator it = 
_TDataSinkType_VALUES_TO_NAMES.find(tsink.type);
+    auto it = _TDataSinkType_VALUES_TO_NAMES.find(tsink.type);
 
     if (it != _TDataSinkType_VALUES_TO_NAMES.end()) {
         op_name = it->second;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 48f8a2d1836..14cd56f5751 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -20,6 +20,7 @@
 #include <fmt/format.h>
 #include <glog/logging.h>
 
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <memory>
@@ -32,8 +33,10 @@
 #include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/local_exchange/local_exchanger.h"
+#include "runtime/memory/mem_tracker.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
 #include "vec/runtime/vdata_stream_recvr.h"
@@ -178,11 +181,21 @@ public:
 
     std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
 
+    Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
+                        vectorized::Block* block, int column_to_keep);
+
+    void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage 
+= usage; }
+
+    size_t& estimate_memory_usage() { return _estimate_memory_usage; }
+
+    void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
+
 protected:
     friend class OperatorXBase;
 
     ObjectPool* _pool = nullptr;
     int64_t _num_rows_returned {0};
+    size_t _estimate_memory_usage {0};
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
@@ -215,6 +228,24 @@ protected:
     vectorized::Block _origin_block;
 };
 
+class ScopedMemTracker {
+public:
+    ScopedMemTracker(size_t& counter) : _counter(counter), 
_mem_tracker("ScopedMemTracker") {
+        
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker);
+        _peak_usage = _mem_tracker.peak_consumption();
+    }
+
+    ~ScopedMemTracker() {
+        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+        _counter += (_mem_tracker.peak_consumption() - _peak_usage);
+    }
+
+private:
+    size_t& _counter;
+    size_t _peak_usage = 0;
+    MemTracker _mem_tracker;
+};
+
 template <typename SharedStateArg = FakeSharedState>
 class PipelineXLocalState : public PipelineXLocalStateBase {
 public:
@@ -386,7 +417,11 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-    Status open(RuntimeState* state) override { return Status::OK(); }
+    Status open(RuntimeState* state) override {
+        _spill_dependency = state->get_spill_dependency();
+        DCHECK(_spill_dependency != nullptr);
+        return Status::OK();
+    }
 
     Status close(RuntimeState* state, Status exec_status) override;
 
@@ -414,6 +449,7 @@ public:
 
 protected:
     Dependency* _dependency = nullptr;
+    Dependency* _spill_dependency = nullptr;
     SharedStateType* _shared_state = nullptr;
 
 private:
@@ -424,13 +460,13 @@ private:
 class DataSinkOperatorXBase : public OperatorBase {
 public:
     DataSinkOperatorXBase(const int operator_id, const int node_id)
-            : OperatorBase(), _operator_id(operator_id), _node_id(node_id), 
_dests_id({1}) {}
+            : _operator_id(operator_id), _node_id(node_id), _dests_id({1}) {}
 
     DataSinkOperatorXBase(const int operator_id, const int node_id, const int 
dest_id)
-            : OperatorBase(), _operator_id(operator_id), _node_id(node_id), 
_dests_id({dest_id}) {}
+            : _operator_id(operator_id), _node_id(node_id), 
_dests_id({dest_id}) {}
 
     DataSinkOperatorXBase(const int operator_id, const int node_id, 
std::vector<int>& sources)
-            : OperatorBase(), _operator_id(operator_id), _node_id(node_id), 
_dests_id(sources) {}
+            : _operator_id(operator_id), _node_id(node_id), _dests_id(sources) 
{}
 
     ~DataSinkOperatorXBase() override = default;
 
@@ -511,6 +547,13 @@ public:
 
     [[nodiscard]] std::string get_name() const override { return _name; }
 
+    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { 
return 0; }
+
+    [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, 
vectorized::Block* block,
+                                                  bool eos) {
+        return true;
+    }
+
     virtual bool should_dry_run(RuntimeState* state) { return false; }
 
 protected:
@@ -582,6 +625,11 @@ public:
         return Status::OK();
     }
 
+    std::vector<Dependency*> dependencies() const override {
+        auto dependencies = Base::dependencies();
+        return dependencies;
+    }
+
     RuntimeProfile::Counter* _spill_counters = nullptr;
     RuntimeProfile::Counter* _spill_timer = nullptr;
     RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
@@ -739,6 +787,10 @@ public:
     void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = 
parallel_tasks; }
     int parallel_tasks() const { return _parallel_tasks; }
 
+    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { 
return 0; }
+
+    virtual void reset_reserve_mem_size(RuntimeState* state) {}
+
 protected:
     template <typename Dependency>
     friend class PipelineXLocalState;
@@ -770,6 +822,7 @@ protected:
     int64_t _limit; // -1: no limit
 
     uint32_t _debug_point_count = 0;
+    std::atomic_uint32_t _bytes_per_row = 0;
 
     std::string _op_name;
     bool _ignore_data_distribution = false;
@@ -795,6 +848,24 @@ public:
     [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
         return state->get_local_state(operator_id())->template 
cast<LocalState>();
     }
+
+    size_t get_reserve_mem_size(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        auto estimated_size = local_state.estimate_memory_usage();
+        if (!is_source() && _child_x) {
+            estimated_size += _child_x->get_reserve_mem_size(state);
+        }
+        return estimated_size;
+    }
+
+    void reset_reserve_mem_size(RuntimeState* state) override {
+        auto& local_state = get_local_state(state);
+        local_state.reset_estimate_memory_usage();
+
+        if (!is_source() && _child_x) {
+            _child_x->reset_reserve_mem_size(state);
+        }
+    }
 };
 
 /**
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 94c51e160da..caa2ac8a134 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -244,6 +244,19 @@ Status 
PartitionSortSinkOperatorX::_split_block_by_partition(
     return Status::OK();
 }
 
+size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    auto rows = state->batch_size();
+    size_t reserve_mem_size = std::visit(
+            vectorized::Overload {[&](std::monostate&) -> size_t { return 0; },
+                                  [&](auto& agg_method) -> size_t {
+                                      return 
agg_method.hash_table->estimate_memory(rows);
+                                  }},
+            local_state._partitioned_data->method_variant);
+    reserve_mem_size += rows * sizeof(size_t); // hash values
+    return reserve_mem_size;
+}
+
 Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
         const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* 
input_block,
         PartitionSortSinkLocalState& local_state, bool eos) {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index e58ac5fea9e..8f660c7e094 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -269,6 +269,8 @@ public:
         return {ExchangeType::PASSTHROUGH};
     }
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
 private:
     friend class PartitionSortSinkLocalState;
     ObjectPool* _pool = nullptr;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 2f94a652a89..48822650dca 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -42,18 +42,18 @@ Status 
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
     output_block->clear_column_data();
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
-        if (local_state._shared_state->blocks_buffer.empty() == false) {
+        if (!local_state._shared_state->blocks_buffer.empty()) {
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
             //if buffer have no data and sink not eos, block reading and wait 
for signal again
-            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
-                    local_state._conjuncts, output_block, 
output_block->columns()));
+            RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
output_block,
+                                                     output_block->columns()));
             if (local_state._shared_state->blocks_buffer.empty() &&
-                local_state._shared_state->sink_eos == false) {
+                !local_state._shared_state->sink_eos) {
                 // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
                 // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
                 std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
-                if (local_state._shared_state->sink_eos == false) {
+                if (!local_state._shared_state->sink_eos) {
                     local_state._dependency->block();
                 }
             }
@@ -71,8 +71,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
     // if we move the _blocks_buffer output at last(behind 286 line),
     // it's maybe eos but not output all data: when _blocks_buffer.empty() and 
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
     RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
-                                                           
output_block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
output_block,
+                                             output_block->columns()));
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 469716b7a22..314806529b7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -32,7 +32,7 @@ 
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase
         : Base(parent, state) {
     _finish_dependency =
             std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                         parent->get_name() + 
"_SPILL_DEPENDENCY", true);
+                                         parent->get_name() + 
"_FINISH_DEPENDENCY", true);
 }
 
 Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
@@ -228,9 +228,16 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
     return sink_local_state->open(state);
 }
 
+size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    auto* runtime_state = local_state._runtime_state.get();
+    return _agg_sink_operator->get_reserve_mem_size(runtime_state);
+}
+
 Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
-               << Base::_parent->node_id() << " revoke_memory"
+               << Base::_parent->node_id()
+               << " revoke_memory, size: " << 
_parent->revocable_mem_size(state)
                << ", eos: " << _eos;
     RETURN_IF_ERROR(Base::_shared_state->sink_status);
     if (!_shared_state->is_spilled) {
@@ -240,14 +247,14 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
 
     // TODO: spill thread may set_ready before the task::execute thread put 
the task to blocked state
     if (!_eos) {
-        Base::_dependency->Dependency::block();
+        Base::_spill_dependency->Dependency::block();
     }
     auto& parent = Base::_parent->template cast<Parent>();
     Status status;
     Defer defer {[&]() {
         if (!status.ok()) {
             if (!_eos) {
-                Base::_dependency->Dependency::set_ready();
+                Base::_spill_dependency->Dependency::set_ready();
             }
         }
     }};
@@ -262,6 +269,7 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
         return status;
     });
 
+    state->get_query_ctx()->increase_revoking_tasks_count();
     auto spill_runnable = std::make_shared<SpillRunnable>(
             state, _shared_state->shared_from_this(),
             [this, &parent, state, query_id, submit_timer] {
@@ -285,16 +293,16 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                         _shared_state->close();
                     } else {
                         VLOG_DEBUG << "query " << print_id(query_id) << " agg 
node "
-                                   << Base::_parent->node_id() << " 
revoke_memory finish"
-                                   << ", eos: " << _eos;
+                                   << Base::_parent->node_id() << " 
revoke_memory finish, size: "
+                                   << _parent->revocable_mem_size(state) << ", 
eos: " << _eos;
                     }
 
                     if (_eos) {
                         Base::_dependency->set_ready_to_read();
                         _finish_dependency->set_ready();
-                    } else {
-                        Base::_dependency->Dependency::set_ready();
                     }
+                    Base::_spill_dependency->Dependency::set_ready();
+                    state->get_query_ctx()->decrease_revoking_tasks_count();
                 }};
                 auto* runtime_state = _runtime_state.get();
                 auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 259d7580877..1a94d9077be 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -16,7 +16,10 @@
 // under the License.
 
 #pragma once
+#include <memory>
+
 #include "aggregation_sink_operator.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vexpr.h"
@@ -322,6 +325,8 @@ public:
 
     Status revoke_memory(RuntimeState* state) override;
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
 private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 018d63a6dee..9a939c62287 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "partitioned_hash_join_probe_operator.h"
 
+#include <glog/logging.h>
+
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
@@ -142,6 +144,8 @@ void 
PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch
 
 Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
+    _spill_dependency = state->get_spill_dependency();
+    DCHECK(_spill_dependency != nullptr);
     return 
_parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state,
                                                                                
   _partitioner);
 }
@@ -221,10 +225,10 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
             _spill_status_ok = false;
             _spill_status = std::move(status);
         }
-        _dependency->set_ready();
+        _spill_dependency->set_ready();
     };
 
-    _dependency->block();
+    _spill_dependency->block();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
@@ -361,12 +365,12 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             _spill_status_ok = false;
             _spill_status = std::move(status);
         }
-        _dependency->set_ready();
+        _spill_dependency->set_ready();
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     has_data = true;
-    _dependency->block();
+    _spill_dependency->block();
     {
         auto* pipeline_task = state->get_task();
         if (pipeline_task) {
@@ -477,12 +481,12 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
             _spill_status_ok = false;
             _spill_status = std::move(status);
         }
-        _dependency->set_ready();
+        _spill_dependency->set_ready();
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     DCHECK(spill_io_pool != nullptr);
-    _dependency->block();
+    _spill_dependency->block();
     has_data = true;
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func",
                     {
@@ -655,6 +659,7 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe sink failed");
     });
+
     
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), 
&block, true));
     VLOG_DEBUG << "query: " << print_id(state->query_id())
                << ", internal build operator finished, node id: " << node_id()
@@ -777,11 +782,8 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
 
 bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
-    const auto revocable_size = revocable_mem_size(state);
-    if (PipelineTask::should_revoke_memory(state, revocable_size)) {
-        return true;
-    }
     if (local_state._shared_state->need_to_spill) {
+        const auto revocable_size = revocable_mem_size(state);
         const auto min_revocable_size = state->min_revocable_mem();
         return revocable_size > min_revocable_size;
     }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 8cccc9f8fae..f1b635208eb 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
@@ -88,6 +89,8 @@ private:
 
     bool _need_to_setup_internal_operators {true};
 
+    Dependency* _spill_dependency {nullptr};
+
     RuntimeProfile::Counter* _spill_and_partition_label = nullptr;
     RuntimeProfile::Counter* _partition_timer = nullptr;
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a7297be493f..8d565bda4dc 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -17,9 +17,14 @@
 
 #include "partitioned_hash_join_sink_operator.h"
 
+#include <glog/logging.h>
+
+#include <algorithm>
+
 #include "pipeline/exec/operator.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
+#include "util/runtime_profile.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -39,6 +44,8 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _partition_shuffle_timer =
             ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", 
"Spill", 1);
     _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"SpillBuildTime", "Spill", 1);
+    _in_mem_rows_counter =
+            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "InMemRow", TUnit::UNIT, 
"Spill", 1);
 
     return Status::OK();
 }
@@ -76,9 +83,9 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     /// If no need to spill, all rows were sunk into the 
`_inner_sink_operator` without partitioned.
     if (!_shared_state->need_to_spill) {
         if (_shared_state->inner_shared_state) {
-            auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
             if (inner_sink_state_) {
-                auto inner_sink_state =
+                auto* inner_sink_state =
                         
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
                 return inner_sink_state->_build_side_mem_used;
             }
@@ -99,6 +106,20 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state) {
+    size_t size_to_reserve = 0;
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get());
+        }
+    }
+    return size_to_reserve;
+}
+
 Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     _shared_state->inner_shared_state->hash_table_variants.reset();
@@ -139,7 +160,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                 std::unique_lock<std::mutex> lock(_spill_lock);
                 _spill_status = status;
                 _spill_status_ok = false;
-                _dependency->set_ready();
+                _spill_dependency->set_ready();
                 return false;
             }
             return true;
@@ -187,7 +208,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _spill_status = st;
                         _spill_status_ok = false;
-                        _dependency->set_ready();
+                        _spill_dependency->set_ready();
                         return;
                     }
                     partitions_indexes[partition_idx].clear();
@@ -203,7 +224,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             }
         }
 
-        _dependency->set_ready();
+        _spill_dependency->set_ready();
     };
 
     auto exception_catch_func = [spill_func, this]() mutable {
@@ -216,7 +237,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             std::unique_lock<std::mutex> lock(_spill_lock);
             _spill_status = status;
             _spill_status_ok = false;
-            _dependency->set_ready();
+            _spill_dependency->set_ready();
         }
     };
 
@@ -225,7 +246,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
-    _dependency->block();
+    _spill_dependency->block();
     DBUG_EXECUTE_IF(
             
"fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func",
 {
                 return Status::Error<INTERNAL_ERROR>(
@@ -239,6 +260,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
               << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
+    CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
     if (!_shared_state->need_to_spill) {
         profile()->add_info_string("Spilled", "true");
@@ -275,9 +297,10 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                     "fault_inject partitioned_hash_join_sink revoke_memory 
submit_func failed");
         });
 
+        state->get_query_ctx()->increase_revoking_tasks_count();
         auto spill_runnable = std::make_shared<SpillRunnable>(
                 state, _shared_state->shared_from_this(),
-                [this, query_id, spilling_stream, i, submit_timer] {
+                [this, state, query_id, spilling_stream, i, submit_timer] {
                     DBUG_EXECUTE_IF(
                             
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
                                 
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -294,12 +317,14 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                         return Status::OK();
                     }();
 
-                    if (!status.OK()) {
+                    if (!status.ok()) {
                         std::unique_lock<std::mutex> lock(_spill_lock);
-                        _dependency->set_ready();
+                        _spill_dependency->set_ready();
                         _spill_status_ok = false;
                         _spill_status = std::move(status);
                     }
+
+                    state->get_query_ctx()->decrease_revoking_tasks_count();
                 });
         if (st.ok()) {
             st = spill_io_pool->submit(std::move(spill_runnable));
@@ -314,10 +339,16 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     if (_spilling_streams_count > 0) {
         std::unique_lock<std::mutex> lock(_spill_lock);
         if (_spilling_streams_count > 0) {
-            _dependency->block();
+            _spill_dependency->block();
         } else if (_child_eos) {
             LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
                       << ", task id: " << state->task_id();
+            std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
+                              if (block) {
+                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                              }
+                          });
             _dependency->set_ready_to_read();
         }
     }
@@ -384,10 +415,16 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
 
     if (num == 1) {
         std::unique_lock<std::mutex> lock(_spill_lock);
-        _dependency->set_ready();
+        _spill_dependency->set_ready();
         if (_child_eos) {
             LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
                       << ", task id: " << state()->task_id();
+            std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
+                              if (block) {
+                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                              }
+                          });
             _dependency->set_ready_to_read();
         }
     }
@@ -477,6 +514,7 @@ Status 
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
 Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                               bool eos) {
     auto& local_state = get_local_state(state);
+    CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr);
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (!local_state._spill_status_ok) {
@@ -492,7 +530,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     if (rows == 0) {
         if (eos) {
             LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                      << ", task id: " << state->task_id();
+                      << ", task id: " << state->task_id() << ", need spil: " 
<< need_to_spill;
 
             if (!need_to_spill) {
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
@@ -506,6 +544,14 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
             }
+
+            
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
+                          
local_state._shared_state->partitioned_build_blocks.end(),
+                          [&](auto& block) {
+                              if (block) {
+                                  
COUNTER_UPDATE(local_state._in_mem_rows_counter, block->rows());
+                              }
+                          });
             local_state._dependency->set_ready_to_read();
         }
         return Status::OK();
@@ -514,11 +560,6 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (need_to_spill) {
         RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
-
-        const auto revocable_size = revocable_mem_size(state);
-        if (revocable_size > state->min_revocable_mem()) {
-            return local_state.revoke_memory(state);
-        }
     } else {
         if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
             RETURN_IF_ERROR(_setup_internal_operator(state));
@@ -534,7 +575,13 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
 
     if (eos) {
         LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                  << ", task id: " << state->task_id();
+                  << ", task id: " << state->task_id() << ", need spil: " << 
need_to_spill;
+        
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
+                      
local_state._shared_state->partitioned_build_blocks.end(), [&](auto& block) {
+                          if (block) {
+                              COUNTER_UPDATE(local_state._in_mem_rows_counter, 
block->rows());
+                          }
+                      });
         local_state._dependency->set_ready_to_read();
     }
 
@@ -553,4 +600,9 @@ Status 
PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
     return local_state.revoke_memory(state);
 }
 
+size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* 
state) {
+    auto& local_state = get_local_state(state);
+    return local_state.get_reserve_mem_size(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 1376964663f..b2c79967b97 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -45,6 +45,7 @@ public:
     Status close(RuntimeState* state, Status exec_status) override;
     Status revoke_memory(RuntimeState* state);
     size_t revocable_mem_size(RuntimeState* state) const;
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
 
 protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
@@ -76,6 +77,7 @@ protected:
     RuntimeProfile::Counter* _partition_timer = nullptr;
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
     RuntimeProfile::Counter* _spill_build_timer = nullptr;
+    RuntimeProfile::Counter* _in_mem_rows_counter = nullptr;
 };
 
 class PartitionedHashJoinSinkOperatorX
@@ -102,6 +104,8 @@ public:
 
     Status revoke_memory(RuntimeState* state) override;
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
     DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index d355d99c2e3..3706a9262fb 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -171,6 +171,7 @@ Status RepeatOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block
     auto& _expr_ctxs = local_state._expr_ctxs;
     DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
     if (input_block->rows() > 0) {
+        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
         _intermediate_block = vectorized::Block::create_unique();
 
         for (auto& expr : _expr_ctxs) {
@@ -196,6 +197,9 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
     auto& _child_eos = local_state._child_eos;
     auto& _intermediate_block = local_state._intermediate_block;
     RETURN_IF_CANCELLED(state);
+
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+
     DCHECK(_repeat_id_idx >= 0);
     for (const std::vector<int64_t>& v : _grouping_list) {
         DCHECK(_repeat_id_idx <= (int)v.size());
@@ -227,8 +231,9 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
         }
         
_child_block.clear_column_data(_child->row_desc().num_materialized_slots());
     }
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
-                                                           
output_block->columns()));
+
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
output_block,
+                                             output_block->columns()));
     *eos = _child_eos && _child_block.rows() == 0;
     local_state.reached_limit(output_block, eos);
     COUNTER_SET(local_state._rows_returned_counter, 
local_state._num_rows_returned);
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index fcc1ed2bbb1..556cf2a4988 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -259,8 +259,8 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, 
vectorized::Block* bl
                         
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
                         src_block.rows());
             }
-            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
-                    local_state._conjuncts, block, 
_dest_tuple_desc->slots().size()));
+            RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
block,
+                                                     
_dest_tuple_desc->slots().size()));
             src_block.clear();
         }
     } while (block->rows() == 0 && !*eos);
diff --git a/be/src/pipeline/exec/select_operator.h 
b/be/src/pipeline/exec/select_operator.h
index 5370cd9e293..e5e57e57203 100644
--- a/be/src/pipeline/exec/select_operator.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -46,8 +46,7 @@ public:
         auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.exec_time_counter());
         RETURN_IF_CANCELLED(state);
-        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                               
block->columns()));
+        RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
block, block->columns()));
         local_state.reached_limit(block, eos);
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 955f956f60d..9d20456e093 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -68,6 +68,7 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+    ScopedMemTracker mem_tracker(local_state._estimate_memory_usage);
 
     auto probe_rows = in_block->rows();
     if (probe_rows > 0) {
@@ -194,6 +195,12 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
     }
 }
 
+template <bool is_intersect>
+size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state) {
+    auto& local_state = get_local_state(state);
+    return local_state._estimate_memory_usage;
+}
+
 template <bool is_intersect>
 void SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
         SetProbeSinkLocalState<is_intersect>& local_state) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index ab53f5358c2..a19479e281a 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -55,6 +55,8 @@ private:
     template <class HashTableContext, bool is_intersected>
     friend struct vectorized::HashTableProbe;
 
+    size_t _estimate_memory_usage = 0;
+
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
     vectorized::ColumnRawPtrs _probe_columns;
@@ -100,6 +102,8 @@ public:
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
     Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& 
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 38667293d48..e96340741e5 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -206,6 +206,33 @@ Status SetSinkOperatorX<is_intersect>::init(const 
TPlanNode& tnode, RuntimeState
     return Status::OK();
 }
 
+template <bool is_intersect>
+size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state) {
+    auto& local_state = get_local_state(state);
+    size_t size_to_reserve = std::visit(
+            [&](auto&& arg) -> size_t {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    return 0;
+                } else {
+                    return 
arg.hash_table->estimate_memory(state->batch_size());
+                }
+            },
+            *local_state._shared_state->hash_table_variants);
+
+    size_to_reserve += local_state._mutable_block.allocated_bytes();
+    for (auto& _child_expr : _child_exprs) {
+        size_to_reserve += 
_child_expr->root()->estimate_memory(state->batch_size());
+    }
+    return size_to_reserve;
+}
+
+template <bool is_intersect>
+Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    return vectorized::VExpr::prepare(_child_exprs, state, 
_child_x->row_desc());
+}
+
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) {
     RETURN_IF_ERROR(Base::open(state));
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 1c08eddc141..be600ff6fd5 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -95,6 +95,8 @@ public:
     }
     bool require_shuffled_data_distribution() const override { return true; }
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
 private:
     template <class HashTableContext, bool is_intersected>
     friend struct HashTableBuild;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 554a58caf14..6c3260ba850 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -74,6 +74,8 @@ Status 
SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+
     _create_mutable_cols(local_state, block);
     auto st = std::visit(
             [&](auto&& arg) -> Status {
@@ -88,8 +90,7 @@ Status 
SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
             },
             *local_state._shared_state->hash_table_variants);
     RETURN_IF_ERROR(st);
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                           block->columns()));
+    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.reached_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 02a99e183c8..7e657859ce9 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -53,6 +53,8 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+
     RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, 
eos));
     local_state.reached_limit(block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 4bf1ab04efb..c82404afb03 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -205,7 +205,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
 
     // TODO: spill thread may set_ready before the task::execute thread put 
the task to blocked state
     if (!_eos) {
-        Base::_dependency->Dependency::block();
+        Base::_spill_dependency->Dependency::block();
     }
     auto query_id = state->query_id();
 
@@ -236,8 +236,10 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
                 _dependency->set_ready_to_read();
                 _finish_dependency->set_ready();
             } else {
-                _dependency->Dependency::set_ready();
+                _spill_dependency->Dependency::set_ready();
             }
+
+            state->get_query_ctx()->decrease_revoking_tasks_count();
         }};
 
         _shared_state->sink_status =
@@ -288,15 +290,17 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
                 "revoke_memory submit_func failed");
     });
     if (status.ok()) {
+        state->get_query_ctx()->increase_revoking_tasks_count();
         status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
                 std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
                                                 exception_catch_func));
     }
     if (!status.ok()) {
         if (!_eos) {
-            Base::_dependency->Dependency::set_ready();
+            Base::_spill_dependency->Dependency::set_ready();
         }
     }
     return status;
 }
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index dfbe42c637e..38dfee5c46d 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1274,14 +1274,14 @@ Status StreamingAggLocalState::close(RuntimeState* 
state) {
 
 Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* 
block, bool* eos) const {
     auto& local_state = get_local_state(state);
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
     if (!local_state._pre_aggregated_block->empty()) {
         local_state._pre_aggregated_block->swap(*block);
     } else {
         RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, 
block, eos));
         local_state.make_nullable_output_key(block);
         // dispose the having clause, should not be execute in prestreaming agg
-        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
-                                                               
block->columns()));
+        RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
block, block->columns()));
     }
     local_state.reached_limit(block, eos);
 
@@ -1291,6 +1291,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, 
vectorized::Block* block
 Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* 
in_block,
                                    bool eos) const {
     auto& local_state = get_local_state(state);
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+
     local_state._input_num_rows += in_block->rows();
     if (in_block->rows() > 0) {
         RETURN_IF_ERROR(local_state.do_pre_agg(in_block, 
local_state._pre_aggregated_block.get()));
diff --git a/be/src/pipeline/exec/table_function_operator.cpp 
b/be/src/pipeline/exec/table_function_operator.cpp
index ff9dfe632fa..364670877f2 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -198,8 +198,7 @@ Status 
TableFunctionLocalState::get_expanded_block(RuntimeState* state,
     }
 
     // 3. eval conjuncts
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
-                                                           
output_block->columns()));
+    RETURN_IF_ERROR(filter_block(_conjuncts, output_block, 
output_block->columns()));
 
     *eos = _child_eos && _cur_child_offset == -1;
     return Status::OK();
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index ecaaf22922b..24d172f6708 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -140,6 +140,9 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* 
state, vectorized::Blo
     DCHECK_EQ(state->per_fragment_instance_idx(), 0);
     auto& local_state = 
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
     DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
+
+    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+
     auto& _const_expr_list_idx = local_state._const_expr_list_idx;
     vectorized::MutableBlock mblock =
             vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, 
_row_descriptor);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8fb750b9e97..e082bb1980f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -22,6 +22,7 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <pthread.h>
 
+#include <algorithm>
 #include <cstdlib>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <fmt/format.h>
@@ -1818,6 +1819,41 @@ Status PipelineFragmentContext::send_report(bool done) {
             req, 
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
+size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) 
const {
+    size_t revocable_size = 0;
+    for (const auto& task_instances : _tasks) {
+        for (const auto& task : task_instances) {
+            if (task->is_running() || task->is_revoking()) {
+                LOG_EVERY_N(INFO, 50) << "query: " << print_id(_query_id)
+                                      << " is running, task: " << 
(void*)task.get()
+                                      << ", task->is_revoking(): " << 
task->is_revoking() << ", "
+                                      << task->is_running();
+                has_running_task = true;
+                return 0;
+            }
+
+            size_t revocable_size_ = task->get_revocable_size();
+            if (revocable_size_ > _runtime_state->min_revocable_mem()) {
+                revocable_size += task->get_revocable_size();
+            }
+        }
+    }
+    return revocable_size;
+}
+
+std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() 
const {
+    std::vector<PipelineTask*> revocable_tasks;
+    for (const auto& task_instances : _tasks) {
+        for (const auto& task : task_instances) {
+            size_t revocable_size_ = task->get_revocable_size();
+            if (revocable_size_ > _runtime_state->min_revocable_mem()) {
+                revocable_tasks.emplace_back(task.get());
+            }
+        }
+    }
+    return revocable_tasks;
+}
+
 std::string PipelineFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f95eb03fb12..7ea39c7377b 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -117,6 +117,10 @@ public:
 
     [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
 
+    [[nodiscard]] size_t get_revocable_size(bool& has_running_task) const;
+
+    [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
+
     void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(_fragment_instance_ids.size());
         for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4f362ac5042..5a987cba416 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -20,17 +20,19 @@
 #include <fmt/format.h>
 #include <gen_cpp/Metrics_types.h>
 #include <glog/logging.h>
-#include <stddef.h>
 
+#include <cstddef>
 #include <ostream>
 #include <vector>
 
 #include "common/status.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/pipeline.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/task_queue.h"
+#include "pipeline/task_scheduler.h"
 #include "runtime/descriptors.h"
 #include "runtime/query_context.h"
 #include "runtime/thread_context.h"
@@ -38,6 +40,7 @@
 #include "util/defer_op.h"
 #include "util/mem_info.h"
 #include "util/runtime_profile.h"
+#include "util/uid_util.h"
 
 namespace doris {
 class RuntimeState;
@@ -64,9 +67,14 @@ PipelineTask::PipelineTask(
           _sink(pipeline->sink_shared_pointer()),
           _le_state_map(std::move(le_state_map)),
           _task_idx(task_idx),
-          _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
+          _execution_dep(state->get_query_ctx()->get_execution_dependency()),
+          _memory_sufficient_dependency(
+                  state->get_query_ctx()->get_memory_sufficient_dependency()) {
     _pipeline_task_watcher.start();
 
+    _spill_dependency = Dependency::create_shared(-1, -1, 
"PipelineTaskSpillDependency", true);
+
+    _state->set_spill_dependency(_spill_dependency.get());
     auto shared_state = _sink->create_shared_state();
     if (shared_state) {
         _sink_shared_state = shared_state;
@@ -265,6 +273,12 @@ bool PipelineTask::_is_blocked() {
         }
     }
 
+    _blocked_dep = _spill_dependency->is_blocked_by(this);
+    if (_blocked_dep != nullptr) {
+        _blocked_dep->start_watcher();
+        return true;
+    }
+
     for (auto* op_dep : _write_dependencies) {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
@@ -272,6 +286,12 @@ bool PipelineTask::_is_blocked() {
             return true;
         }
     }
+
+    _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this);
+    if (_blocked_dep != nullptr) {
+        _blocked_dep->start_watcher();
+        return true;
+    }
     return false;
 }
 
@@ -312,6 +332,8 @@ Status PipelineTask::execute(bool* eos) {
         RETURN_IF_ERROR(_open());
     }
 
+    const auto query_id = _state->query_id();
+
     while (!_fragment_context->is_canceled()) {
         if (_is_blocked()) {
             return Status::OK();
@@ -332,24 +354,50 @@ Status PipelineTask::execute(bool* eos) {
         auto* block = _block.get();
 
         auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
-        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
+
+        bool is_wg_mem_low_water_mark = false;
+        bool is_wg_mem_high_water_mark = false;
+        if (should_revoke_memory(_state, sink_revocable_mem_size, 
is_wg_mem_low_water_mark,
+                                 is_wg_mem_high_water_mark)) {
             RETURN_IF_ERROR(_sink->revoke_memory(_state));
             continue;
         }
+
         *eos = _eos;
         DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
             Status status =
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
+
+        DEFER_RELEASE_RESERVED();
+
         // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
+        size_t reserve_size = 0;
+        bool has_enough_memory = true;
         if (_dry_run || _sink->is_finished(_state)) {
             *eos = true;
             _eos = true;
         } else {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
+            size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
+            reserve_size = _root->get_reserve_mem_size(_state) + 
sink_reserve_size;
+            _root->reset_reserve_mem_size(_state);
+            DCHECK_EQ(_root->get_reserve_mem_size(_state), 0);
+
+            if (reserve_size > 0) {
+                auto st = thread_context()->try_reserve_memory(reserve_size);
+                if (!st.ok()) {
+                    LOG(INFO) << "query: " << print_id(query_id)
+                              << ", try to reserve: " << reserve_size
+                              << " failed: " << st.to_string()
+                              << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    has_enough_memory = false;
+                }
+            }
+
             RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
         }
 
@@ -373,13 +421,25 @@ Status PipelineTask::execute(bool* eos) {
                 return Status::OK();
             }
         }
+
+        if (!has_enough_memory) {
+            COUNTER_UPDATE(_yield_counts, 1);
+
+            LOG(INFO) << "query: " << print_id(query_id) << ", task: " << 
(void*)this
+                      << ", insufficient memory. reserve_size: " << 
reserve_size;
+            _memory_sufficient_dependency->block();
+            
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
+            break;
+        }
     }
 
     static_cast<void>(get_task_queue()->push_back(this));
     return Status::OK();
 }
 
-bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes) {
+bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes,
+                                        bool& is_wg_mem_low_water_mark,
+                                        bool& is_wg_mem_high_water_mark) {
     auto* query_ctx = state->get_query_ctx();
     auto wg = query_ctx->workload_group();
     if (!wg) {
@@ -395,8 +455,6 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
         }
     }
 
-    bool is_wg_mem_low_water_mark = false;
-    bool is_wg_mem_high_water_mark = false;
     wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
     if (is_wg_mem_high_water_mark) {
         if (revocable_mem_bytes > min_revocable_mem_bytes) {
@@ -507,6 +565,9 @@ std::string PipelineTask::debug_string() {
         }
     }
 
+    fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+                   _memory_sufficient_dependency->debug_string(i++));
+
     fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
     for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
         fmt::format_to(debug_string_buffer, "{}. {}\n", i,
@@ -527,6 +588,14 @@ std::string PipelineTask::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
+size_t PipelineTask::get_revocable_size() const {
+    return (_running || _eos) ? 0 : _sink->revocable_mem_size(_state);
+}
+
+Status PipelineTask::revoke_memory() {
+    return _sink->revoke_memory(_state);
+}
+
 void PipelineTask::wake_up() {
     // call by dependency
     static_cast<void>(get_task_queue()->push_back(this));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dd2ead4b5dc..79da888cda6 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -17,8 +17,7 @@
 
 #pragma once
 
-#include <stdint.h>
-
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
@@ -141,6 +140,8 @@ public:
         std::unique_lock<std::mutex> lc(_dependency_lock);
         if (!_finalized) {
             _execution_dep->set_always_ready();
+            _memory_sufficient_dependency->set_always_ready();
+            _spill_dependency->set_always_ready();
             for (auto* dep : _filter_dependencies) {
                 dep->set_always_ready();
             }
@@ -180,10 +181,14 @@ public:
     /**
      * Return true if:
      * 1. `enable_force_spill` is true which forces this task to spill data.
-     * 2. Or memory consumption reaches the high water mark of current 
workload group (80% of memory limitation by default) and revocable_mem_bytes is 
bigger than min_revocable_mem_bytes.
-     * 3. Or memory consumption is higher than the low water mark of current 
workload group (50% of memory limitation by default) and 
`query_weighted_consumption >= query_weighted_limit` and revocable memory is 
big enough.
+     * 2. Or memory consumption reaches the high water mark of current 
workload group (80% of memory limitation by default) 
+        and revocable_mem_bytes is bigger than min_revocable_mem_bytes.
+     * 3. Or memory consumption is higher than the low water mark of current 
workload group (50% of memory limitation by default) 
+        and `query_weighted_consumption >= query_weighted_limit` and revocable 
memory is big enough.
      */
-    static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes);
+    static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes,
+                                     bool& is_wg_mem_low_water_mark,
+                                     bool& is_wg_mem_high_water_mark);
 
     void put_in_runnable_queue() {
         _schedule_time++;
@@ -193,7 +198,8 @@ public:
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
 
     bool is_running() { return _running.load(); }
-    void set_running(bool running) { _running = running; }
+    bool is_revoking() { return _spill_dependency->is_blocked_by(nullptr) != 
nullptr; }
+    bool set_running(bool running) { return _running.exchange(running); }
 
     bool is_exceed_debug_timeout() {
         if (_has_exceed_timeout) {
@@ -231,6 +237,9 @@ public:
         }
     }
 
+    [[nodiscard]] size_t get_revocable_size() const;
+    [[nodiscard]] Status revoke_memory();
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
@@ -306,11 +315,16 @@ private:
 
     Dependency* _execution_dep = nullptr;
 
+    Dependency* _memory_sufficient_dependency = nullptr;
+
+    std::shared_ptr<Dependency> _spill_dependency;
+
     std::atomic<bool> _finalized {false};
     std::mutex _dependency_lock;
 
     std::atomic<bool> _running {false};
     std::atomic<bool> _eos {false};
+    std::atomic<bool> _revoking {false};
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 8be30773ee1..f76affffb70 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -24,19 +24,25 @@
 #include <sched.h>
 
 // IWYU pragma: no_include <bits/chrono.h>
+#include <algorithm>
 #include <chrono> // IWYU pragma: keep
+#include <cstddef>
 #include <functional>
+#include <memory>
+#include <mutex>
 #include <ostream>
 #include <string>
 #include <thread>
 #include <utility>
 
 #include "common/logging.h"
+#include "common/status.h"
 #include "pipeline/pipeline_task.h"
 #include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
+#include "runtime/thread_context.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/time.h"
@@ -53,16 +59,17 @@ TaskScheduler::~TaskScheduler() {
 Status TaskScheduler::start() {
     int cores = _task_queue->cores();
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
-                            .set_min_threads(cores)
-                            .set_max_threads(cores)
+                            .set_min_threads(cores + 1)
+                            .set_max_threads(cores + 1)
                             .set_max_queue_size(0)
                             .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                             .build(&_fix_thread_pool));
     LOG_INFO("TaskScheduler set cores").tag("size", cores);
-    _markers.resize(cores, true);
     for (size_t i = 0; i < cores; ++i) {
         RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));
     }
+
+    RETURN_IF_ERROR(_fix_thread_pool->submit_func([this] { 
_paused_queries_handler(); }));
     return Status::OK();
 }
 
@@ -98,17 +105,19 @@ void _close_task(PipelineTask* task, Status exec_status) {
 }
 
 void TaskScheduler::_do_work(size_t index) {
-    while (_markers[index]) {
+    while (!_need_to_stop) {
         auto* task = _task_queue->take(index);
         if (!task) {
             continue;
         }
+
         if (task->is_running()) {
             static_cast<void>(_task_queue->push_back(task, index));
             continue;
         }
-        task->log_detail_if_need();
         task->set_running(true);
+
+        task->log_detail_if_need();
         task->set_task_queue(_task_queue.get());
         auto* fragment_ctx = task->fragment_context();
         bool canceled = fragment_ctx->is_canceled();
@@ -187,15 +196,167 @@ void TaskScheduler::_do_work(size_t index) {
     }
 }
 
+void TaskScheduler::add_paused_task(PipelineTask* task) {
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto query_ctx_sptr = 
task->runtime_state()->get_query_ctx()->shared_from_this();
+    DCHECK(query_ctx_sptr != nullptr);
+    auto wg = query_ctx_sptr->workload_group();
+    auto&& [it, inserted] = 
_paused_queries_list[wg].emplace(std::move(query_ctx_sptr));
+    if (inserted) {
+        LOG(INFO) << "here insert one new paused query: " << 
print_id(it->get()->query_id());
+    }
+
+    _paused_queries_cv.notify_all();
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group is below low water mark, we make all 
queries in this wg runnable.
+ * strategy 3: Pick the query which has the max revocable size to revoke 
memory.
+ * strategy 4: If all queries are not revocable and they all have not any 
running task,
+ *             we choose the max memory usage query to cancel.
+ */
+void TaskScheduler::_paused_queries_handler() {
+    while (!_need_to_stop) {
+        {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list.empty()) {
+                _paused_queries_cv.wait(lock, [&] { return 
!_paused_queries_list.empty(); });
+            }
+
+            if (_need_to_stop) {
+                break;
+            }
+
+            if (_paused_queries_list.empty()) {
+                continue;
+            }
+
+            for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+                auto& queries_list = it->second;
+                const auto& wg = it->first;
+                if (queries_list.empty()) {
+                    LOG(INFO) << "wg: " << wg->debug_string() << " has no 
paused query";
+                    it = _paused_queries_list.erase(it);
+                    continue;
+                }
+
+                bool is_low_wartermark = false;
+                bool is_high_wartermark = false;
+
+                wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+
+                if (!is_low_wartermark && !is_high_wartermark) {
+                    LOG(INFO) << "**** there are " << queries_list.size() << " 
to resume";
+                    for (const auto& query : queries_list) {
+                        LOG(INFO) << "**** resume paused query: " << 
print_id(query->query_id());
+                        query->set_memory_sufficient(true);
+                    }
+
+                    queries_list.clear();
+                    it = _paused_queries_list.erase(it);
+                    continue;
+                } else {
+                    ++it;
+                }
+
+                std::shared_ptr<QueryContext> max_revocable_query;
+                std::shared_ptr<QueryContext> max_memory_usage_query;
+                std::shared_ptr<QueryContext> running_query;
+                bool has_running_query = false;
+                size_t max_revocable_size = 0;
+                size_t max_memory_usage = 0;
+                auto it_to_remove = queries_list.end();
+
+                for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+                    const auto& query_ctx = *query_it;
+                    size_t revocable_size = 0;
+                    size_t memory_usage = 0;
+                    bool has_running_task = false;
+
+                    if (query_ctx->is_cancelled()) {
+                        LOG(INFO) << "query: " << 
print_id(query_ctx->query_id())
+                                  << "was canceled, remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+
+                    query_ctx->get_revocable_info(revocable_size, 
memory_usage, has_running_task);
+                    if (has_running_task) {
+                        has_running_query = true;
+                        running_query = query_ctx;
+                        break;
+                    } else if (revocable_size > max_revocable_size) {
+                        max_revocable_query = query_ctx;
+                        max_revocable_size = revocable_size;
+                        it_to_remove = query_it;
+                    } else if (memory_usage > max_memory_usage) {
+                        max_memory_usage_query = query_ctx;
+                        max_memory_usage = memory_usage;
+                        it_to_remove = query_it;
+                    }
+
+                    ++query_it;
+                }
+
+                if (has_running_query) {
+                    LOG(INFO) << "has running task, query: " << 
print_id(running_query->query_id());
+                    
std::this_thread::sleep_for(std::chrono::milliseconds(500));
+                } else if (max_revocable_query) {
+                    queries_list.erase(it_to_remove);
+                    queries_list.insert(queries_list.begin(), 
max_revocable_query);
+
+                    auto revocable_tasks = 
max_revocable_query->get_revocable_tasks();
+                    DCHECK(!revocable_tasks.empty());
+
+                    LOG(INFO) << "query: " << 
print_id(max_revocable_query->query_id()) << ", has "
+                              << revocable_tasks.size()
+                              << " tasks to revoke memory, max revocable size: 
"
+                              << max_revocable_size;
+                    SCOPED_ATTACH_TASK(max_revocable_query.get());
+                    for (auto* task : revocable_tasks) {
+                        auto st = task->revoke_memory();
+                        if (!st.ok()) {
+                            max_revocable_query->cancel(st);
+                            break;
+                        }
+                    }
+                } else if (max_memory_usage_query) {
+                    bool new_is_low_wartermark = false;
+                    bool new_is_high_wartermark = false;
+                    wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
+                    if (new_is_high_wartermark) {
+                        LOG(INFO) << "memory insufficient and cannot find 
revocable query, cancel "
+                                     "the query: "
+                                  << 
print_id(max_memory_usage_query->query_id())
+                                  << ", usage: " << max_memory_usage
+                                  << ", wg info: " << wg->debug_string();
+                        max_memory_usage_query->cancel(Status::InternalError(
+                                "memory insufficient and cannot find revocable 
query, cancel the "
+                                "biggest usage({}) query({})",
+                                max_memory_usage, 
print_id(max_memory_usage_query->query_id())));
+                    } else {
+                        LOG(INFO) << "new_is_high_wartermark is false, resume 
max memory usage "
+                                     "paused query: "
+                                  << 
print_id(max_memory_usage_query->query_id());
+                        max_memory_usage_query->set_memory_sufficient(true);
+                        queries_list.erase(it_to_remove);
+                    }
+                }
+            }
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+}
+
 void TaskScheduler::stop() {
     if (!_shutdown) {
         if (_task_queue) {
             _task_queue->close();
         }
         if (_fix_thread_pool) {
-            for (size_t i = 0; i < _markers.size(); ++i) {
-                _markers[i] = false;
-            }
+            _need_to_stop = true;
+            _paused_queries_cv.notify_all();
             _fix_thread_pool->shutdown();
             _fix_thread_pool->wait();
         }
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9a20807ea26..987dcd81bd0 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -17,10 +17,9 @@
 
 #pragma once
 
-#include <stddef.h>
-
 #include <atomic>
 #include <condition_variable>
+#include <cstddef>
 #include <list>
 #include <memory>
 #include <mutex>
@@ -30,6 +29,7 @@
 #include "common/status.h"
 #include "gutil/ref_counted.h"
 #include "pipeline_task.h"
+#include "runtime/query_context.h"
 #include "runtime/workload_group/workload_group.h"
 #include "util/thread.h"
 
@@ -49,7 +49,6 @@ public:
     TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, 
std::string name,
                   CgroupCpuCtl* cgroup_cpu_ctl)
             : _task_queue(std::move(task_queue)),
-              _shutdown(false),
               _name(std::move(name)),
               _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
 
@@ -63,14 +62,22 @@ public:
 
     std::vector<int> thread_debug_info() { return 
_fix_thread_pool->debug_info(); }
 
+    void add_paused_task(PipelineTask* task);
+
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
-    std::vector<bool> _markers;
-    bool _shutdown;
+    bool _need_to_stop = false;
+    bool _shutdown = false;
     std::string _name;
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 
+    std::map<WorkloadGroupPtr, std::set<std::shared_ptr<QueryContext>>> 
_paused_queries_list;
+    std::mutex _paused_queries_lock;
+    std::condition_variable _paused_queries_cv;
+
     void _do_work(size_t index);
+
+    void _paused_queries_handler();
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f5ca19add..9a04658876d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -77,7 +77,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
                            bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe,
                            QuerySource query_source)
         : _timeout_second(-1),
-          _query_id(query_id),
+          _query_id(std::move(query_id)),
           _exec_env(exec_env),
           _is_pipeline(is_pipeline),
           _is_nereids(is_nereids),
@@ -88,6 +88,9 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
     _query_watcher.start();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _execution_dependency = pipeline::Dependency::create_unique(-1, -1, 
"ExecutionDependency");
+    _memory_sufficient_dependency =
+            pipeline::Dependency::create_unique(-1, -1, 
"MemorySufficientDependency", true);
+
     _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
             TUniqueId(), RuntimeFilterParamsContext::create(this), 
query_mem_tracker);
 
@@ -191,6 +194,7 @@ QueryContext::~QueryContext() {
     }
     _runtime_filter_mgr.reset();
     _execution_dependency.reset();
+    _memory_sufficient_dependency.reset();
     _shared_hash_table_controller.reset();
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
@@ -218,6 +222,14 @@ void QueryContext::set_execution_dependency_ready() {
     _execution_dependency->set_ready();
 }
 
+void QueryContext::set_memory_sufficient(bool sufficient) {
+    if (sufficient) {
+        _memory_sufficient_dependency->set_ready();
+    } else {
+        _memory_sufficient_dependency->block();
+    }
+}
+
 void QueryContext::cancel(Status new_status, int fragment_id) {
     if (!_exec_status.update(new_status)) {
         return;
@@ -386,11 +398,63 @@ void QueryContext::_report_query_profile() {
     
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
 }
 
+void QueryContext::get_revocable_info(size_t& revocable_size, size_t& 
memory_usage,
+                                      bool& has_running_task) const {
+    revocable_size = 0;
+    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+        auto fragment_ctx = fragment_wptr.lock();
+        if (!fragment_ctx) {
+            continue;
+        }
+
+        revocable_size += fragment_ctx->get_revocable_size(has_running_task);
+
+        // Should wait for all tasks are not running before revoking memory.
+        if (has_running_task) {
+            break;
+        }
+    }
+
+    memory_usage = query_mem_tracker->consumption();
+}
+
+size_t QueryContext::get_revocable_size() const {
+    size_t revocable_size = 0;
+    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+        auto fragment_ctx = fragment_wptr.lock();
+        if (!fragment_ctx) {
+            continue;
+        }
+
+        bool has_running_task = false;
+        revocable_size += fragment_ctx->get_revocable_size(has_running_task);
+
+        // Should wait for all tasks are not running before revoking memory.
+        if (has_running_task) {
+            return 0;
+        }
+    }
+    return revocable_size;
+}
+
+std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const 
{
+    std::vector<pipeline::PipelineTask*> tasks;
+    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+        auto fragment_ctx = fragment_wptr.lock();
+        if (!fragment_ctx) {
+            continue;
+        }
+        auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
+        tasks.insert(tasks.end(), tasks_of_fragment.cbegin(), 
tasks_of_fragment.cend());
+    }
+    return tasks;
+}
+
 std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
 QueryContext::_collect_realtime_query_profile() const {
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
res;
 
-    for (auto& [fragment_id, fragment_ctx_wptr] : 
_fragment_id_to_pipeline_ctx) {
+    for (const auto& [fragment_id, fragment_ctx_wptr] : 
_fragment_id_to_pipeline_ctx) {
         if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
             if (fragment_ctx == nullptr) {
                 std::string msg =
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7a6d6d3c53d..1edb204f049 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -30,6 +30,7 @@
 #include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/object_pool.h"
+#include "pipeline/dependency.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/query_statistics.h"
@@ -45,6 +46,7 @@ namespace doris {
 
 namespace pipeline {
 class PipelineFragmentContext;
+class PipelineTask;
 } // namespace pipeline
 
 struct ReportStatusRequest {
@@ -76,7 +78,7 @@ const std::string toString(QuerySource query_source);
 // Some components like DescriptorTbl may be very large
 // that will slow down each execution of fragments when DeSer them every time.
 class DescriptorTbl;
-class QueryContext {
+class QueryContext : public std::enable_shared_from_this<QueryContext> {
     ENABLE_FACTORY_CREATOR(QueryContext);
 
 public:
@@ -118,6 +120,8 @@ public:
 
     void set_execution_dependency_ready();
 
+    void set_memory_sufficient(bool sufficient);
+
     void set_ready_to_execute_only();
 
     std::shared_ptr<vectorized::SharedHashTableController> 
get_shared_hash_table_controller() {
@@ -181,6 +185,12 @@ public:
 
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
 
+    pipeline::Dependency* get_memory_sufficient_dependency() {
+        return _memory_sufficient_dependency.get();
+    }
+
+    std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
+
     void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
 
     std::shared_ptr<QueryStatistics> get_query_statistics();
@@ -218,6 +228,16 @@ public:
         return _running_big_mem_op_num.load(std::memory_order_relaxed);
     }
 
+    void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); 
}
+
+    void decrease_revoking_tasks_count() { _revoking_tasks_count.fetch_sub(1); 
}
+
+    int get_revoking_tasks_count() const { return 
_revoking_tasks_count.load(); }
+
+    void get_revocable_info(size_t& revocable_size, size_t& memory_usage,
+                            bool& has_running_task) const;
+    size_t get_revocable_size() const;
+
     void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = 
spill_threshold; }
     int64_t spill_threshold() { return _spill_threshold; }
     DescriptorTbl* desc_tbl = nullptr;
@@ -253,6 +273,7 @@ private:
     bool _is_pipeline = false;
     bool _is_nereids = false;
     std::atomic<int> _running_big_mem_op_num = 0;
+    std::atomic<int> _revoking_tasks_count = 0;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
     // This thread pool token is created from "_limited_scan_thread_pool" from 
exec env.
@@ -280,6 +301,9 @@ private:
     vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
+    std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
+    std::vector<std::weak_ptr<pipeline::PipelineTask>> _pipeline_tasks;
+
     std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
     // This shared ptr is never used. It is just a reference to hold the 
object.
     // There is a weak ptr in runtime filter manager to reference this object.
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f43d0a163df..87c54564ae5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -56,6 +56,7 @@ class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
 class PipelineFragmentContext;
 class PipelineTask;
+class Dependency;
 } // namespace pipeline
 
 class DescriptorTbl;
@@ -600,7 +601,11 @@ public:
 
     vectorized::ColumnInt64* partial_update_auto_inc_column() {
         return _partial_update_auto_inc_column;
-    };
+    }
+
+    void set_spill_dependency(pipeline::Dependency* dependency) { 
_spill_dependency = dependency; }
+
+    pipeline::Dependency* get_spill_dependency() { return _spill_dependency; }
 
 private:
     Status create_error_log_file();
@@ -697,6 +702,8 @@ private:
     int _task_id = -1;
     int _task_num = 0;
 
+    pipeline::Dependency* _spill_dependency;
+
     std::vector<THivePartitionUpdate> _hive_partition_updates;
 
     std::vector<TIcebergCommitData> _iceberg_commit_datas;
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 490cd501692..ea912da32f1 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -807,6 +807,18 @@ public:
         }
     }
 
+    size_t estimate_memory(size_t num_elem) const {
+        if (!add_elem_size_overflow(num_elem)) {
+            return 0;
+        }
+
+        auto new_size = num_elem + grower.buf_size();
+        Grower new_grower = grower;
+        new_grower.set(new_size);
+
+        return new_grower.buf_size() * sizeof(Cell);
+    }
+
     /// Insert a value. In the case of any more complex values, it is better 
to use the `emplace` function.
     std::pair<LookupResult, bool> ALWAYS_INLINE insert(const value_type& x) {
         std::pair<LookupResult, bool> res;
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h 
b/be/src/vec/common/hash_table/ph_hash_map.h
index 8f61ee966c5..10896c73a77 100644
--- a/be/src/vec/common/hash_table/ph_hash_map.h
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -219,6 +219,15 @@ public:
         return (_hash_map.size() + row) > (capacity * 7 / 8);
     }
 
+    size_t estimate_memory(size_t num_elem) const {
+        if (!add_elem_size_overflow(num_elem)) {
+            return 0;
+        }
+
+        auto new_size = _hash_map.capacity() * 2 + 1;
+        return new_size * sizeof(typename HashMapImpl::slot_type);
+    }
+
     size_t size() const { return _hash_map.size(); }
     template <typename MappedType>
     char* get_null_key_data() {
diff --git a/be/src/vec/common/hash_table/string_hash_table.h 
b/be/src/vec/common/hash_table/string_hash_table.h
index 74be1e85e1e..8ae1f235613 100644
--- a/be/src/vec/common/hash_table/string_hash_table.h
+++ b/be/src/vec/common/hash_table/string_hash_table.h
@@ -679,4 +679,30 @@ public:
                m3.add_elem_size_overflow(add_size) || 
m4.add_elem_size_overflow(add_size) ||
                ms.add_elem_size_overflow(add_size);
     }
+
+    size_t estimate_memory(size_t num_elem) const {
+        size_t estimate_size = 0;
+
+        if (m1.add_elem_size_overflow(num_elem)) {
+            estimate_size = m1.estimate_memory(num_elem);
+        }
+
+        if (m2.add_elem_size_overflow(num_elem)) {
+            estimate_size = std::max(estimate_size, 
m2.estimate_memory(num_elem));
+        }
+
+        if (m3.add_elem_size_overflow(num_elem)) {
+            estimate_size = std::max(estimate_size, 
m3.estimate_memory(num_elem));
+        }
+
+        if (m4.add_elem_size_overflow(num_elem)) {
+            estimate_size = std::max(estimate_size, 
m4.estimate_memory(num_elem));
+        }
+
+        if (ms.add_elem_size_overflow(num_elem)) {
+            estimate_size = std::max(estimate_size, 
ms.estimate_memory(num_elem));
+        }
+
+        return estimate_size;
+    }
 };
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 65d4230488a..9ff45c5dad9 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -189,6 +189,24 @@ Status 
VectorizedFnCall::_do_execute(doris::vectorized::VExprContext* context,
     return Status::OK();
 }
 
+size_t VectorizedFnCall::estimate_memory(const size_t rows) {
+    if (is_const_and_have_executed()) { // const have execute in open function
+        return 0;
+    }
+
+    size_t estimate_size = 0;
+    for (auto& child : _children) {
+        estimate_size += child->estimate_memory(rows);
+    }
+
+    if (_data_type->have_maximum_size_of_value()) {
+        estimate_size += rows * 
_data_type->get_maximum_size_of_value_in_memory();
+    } else {
+        estimate_size += rows * 512; /// FIXME: estimated value...
+    }
+    return estimate_size;
+}
+
 Status 
VectorizedFnCall::execute_runtime_fitler(doris::vectorized::VExprContext* 
context,
                                                 doris::vectorized::Block* 
block,
                                                 int* result_column_id, 
std::vector<size_t>& args) {
diff --git a/be/src/vec/exprs/vectorized_fn_call.h 
b/be/src/vec/exprs/vectorized_fn_call.h
index bae996136dd..b78b27e9764 100644
--- a/be/src/vec/exprs/vectorized_fn_call.h
+++ b/be/src/vec/exprs/vectorized_fn_call.h
@@ -72,6 +72,8 @@ public:
     bool can_push_down_to_index() const override;
     bool equals(const VExpr& other) override;
 
+    size_t estimate_memory(const size_t rows) override;
+
 protected:
     FunctionBasePtr _function;
     std::string _expr_name;
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index da496cdd9f0..be869e4cc8f 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -731,6 +731,24 @@ Status VExpr::_evaluate_inverted_index(VExprContext* 
context, const FunctionBase
     return Status::OK();
 }
 
+size_t VExpr::estimate_memory(const size_t rows) {
+    if (is_const_and_have_executed()) {
+        return 0;
+    }
+
+    size_t estimate_size = 0;
+    for (auto& child : _children) {
+        estimate_size += child->estimate_memory(rows);
+    }
+
+    if (_data_type->have_maximum_size_of_value()) {
+        estimate_size += rows * 
_data_type->get_maximum_size_of_value_in_memory();
+    } else {
+        estimate_size += rows * 64; /// TODO: need a more reasonable value
+    }
+    return estimate_size;
+}
+
 bool VExpr::fast_execute(doris::vectorized::VExprContext* context, 
doris::vectorized::Block* block,
                          int* result_column_id) {
     if (context->get_inverted_index_context() &&
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 382713b2afc..573f5c38b59 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -126,6 +126,8 @@ public:
     Status _evaluate_inverted_index(VExprContext* context, const 
FunctionBasePtr& function,
                                     uint32_t segment_num_rows);
 
+    virtual size_t estimate_memory(const size_t rows);
+
     // Only the 4th parameter is used in the runtime filter. In and MinMax 
need overwrite the
     // interface
     virtual Status execute_runtime_fitler(VExprContext* context, Block* block,
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index bcfd7cda102..4db2033983b 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exprs/vexpr_context.h"
 
+#include <algorithm>
 #include <ostream>
 #include <string>
 
@@ -136,7 +137,9 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, 
Block* block, int col
         return Status::OK();
     }
     int result_column_id = -1;
+    size_t origin_size = block->allocated_bytes();
     RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
+    vexpr_ctx->_memory_usage = (block->allocated_bytes() - origin_size);
     return Block::filter_block(block, result_column_id, column_to_keep);
 }
 
@@ -283,14 +286,14 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& conjuncts, Block
     for (const auto& conjunct : conjuncts) {
         int result_column_id = -1;
         RETURN_IF_ERROR(conjunct->execute(block, &result_column_id));
-        auto& filter_column =
+        const auto& filter_column =
                 
unpack_if_const(block->get_by_position(result_column_id).column).first;
-        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
+        if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
             const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
             const IColumn::Filter& result =
                     assert_cast<const ColumnUInt8&>(*nested_column).get_data();
-            auto* __restrict filter_data = result.data();
-            auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+            const auto* __restrict filter_data = result.data();
+            const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
             DCHECK_EQ(rows, nullable_column->size());
 
             for (size_t i = 0; i != rows; ++i) {
@@ -302,7 +305,8 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& conjuncts, Block
                 final_filter_ptr[i] = final_filter_ptr[i] & filter_data[i];
             }
         } else {
-            auto* filter_data = assert_cast<const 
ColumnUInt8&>(*filter_column).get_data().data();
+            const auto* filter_data =
+                    assert_cast<const 
ColumnUInt8&>(*filter_column).get_data().data();
             for (size_t i = 0; i != rows; ++i) {
                 final_filter_ptr[i] = final_filter_ptr[i] & filter_data[i];
             }
@@ -318,11 +322,19 @@ Status 
VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs&
                                                         int column_to_keep) {
     IColumn::Filter result_filter(block->rows(), 1);
     bool can_filter_all;
+
+    _reset_memory_usage(ctxs);
+
     RETURN_IF_ERROR(
             execute_conjuncts(ctxs, nullptr, false, block, &result_filter, 
&can_filter_all));
+
+    // Accumulate the usage of `result_filter` into the first context.
+    if (!ctxs.empty()) {
+        ctxs[0]->_memory_usage += result_filter.allocated_bytes();
+    }
     if (can_filter_all) {
         for (auto& col : columns_to_filter) {
-            
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+            block->get_by_position(col).column->assume_mutable()->clear();
         }
     } else {
         try {
@@ -349,12 +361,18 @@ Status 
VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs&
                                                         std::vector<uint32_t>& 
columns_to_filter,
                                                         int column_to_keep,
                                                         IColumn::Filter& 
filter) {
+    _reset_memory_usage(ctxs);
     filter.resize_fill(block->rows(), 1);
     bool can_filter_all;
     RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &filter, 
&can_filter_all));
+
+    // Accumulate the usage of `result_filter` into the first context.
+    if (!ctxs.empty()) {
+        ctxs[0]->_memory_usage += filter.allocated_bytes();
+    }
     if (can_filter_all) {
         for (auto& col : columns_to_filter) {
-            
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+            block->get_by_position(col).column->assume_mutable()->clear();
         }
     } else {
         RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, 
columns_to_filter, filter));
@@ -375,13 +393,20 @@ Status VExprContext::get_output_block_after_execute_exprs(
     auto rows = input_block.rows();
     vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
     vectorized::ColumnsWithTypeAndName result_columns;
+    _reset_memory_usage(output_vexpr_ctxs);
+
     for (const auto& vexpr_ctx : output_vexpr_ctxs) {
         int result_column_id = -1;
+        int origin_columns = tmp_block.columns();
+        size_t origin_usage = tmp_block.allocated_bytes();
         RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
         DCHECK(result_column_id != -1);
+
+        vexpr_ctx->_memory_usage = tmp_block.allocated_bytes() - origin_usage;
         const auto& col = tmp_block.get_by_position(result_column_id);
-        if (do_projection) {
+        if (do_projection && origin_columns <= result_column_id) {
             result_columns.emplace_back(col.column->clone_resized(rows), 
col.type, col.name);
+            vexpr_ctx->_memory_usage += 
result_columns.back().column->allocated_bytes();
         } else {
             
result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
         }
@@ -390,4 +415,9 @@ Status VExprContext::get_output_block_after_execute_exprs(
     return Status::OK();
 }
 
+void VExprContext::_reset_memory_usage(const VExprContextSPtrs& contexts) {
+    std::for_each(contexts.begin(), contexts.end(),
+                  [](auto&& context) { context->_memory_usage = 0; });
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index d022cf6169e..02628c94a17 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -19,6 +19,8 @@
 
 #include <glog/logging.h>
 
+#include <algorithm>
+#include <cstddef>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -271,10 +273,21 @@ public:
         return *this;
     }
 
+    [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& 
contexts) {
+        size_t usage = 0;
+        std::for_each(contexts.cbegin(), contexts.cend(),
+                      [&usage](auto&& context) { usage += 
context->_memory_usage; });
+        return usage;
+    }
+
+    [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; }
+
 private:
     // Close method is called in vexpr context dector, not need call expicility
     void close();
 
+    static void _reset_memory_usage(const VExprContextSPtrs& contexts);
+
     friend class VExpr;
 
     /// The expr tree this context is for.
@@ -301,5 +314,6 @@ private:
     bool _force_materialize_slot = false;
 
     std::shared_ptr<InvertedIndexContext> _inverted_index_context;
+    size_t _memory_usage = 0;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vin_predicate.cpp 
b/be/src/vec/exprs/vin_predicate.cpp
index 9e00a3afbae..3e15138fe59 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -155,6 +155,26 @@ Status VInPredicate::execute(VExprContext* context, Block* 
block, int* result_co
     return Status::OK();
 }
 
+size_t VInPredicate::estimate_memory(const size_t rows) {
+    if (is_const_and_have_executed()) {
+        return 0;
+    }
+
+    size_t estimate_size = 0;
+
+    for (int i = 0; i < _children.size(); ++i) {
+        estimate_size += _children[i]->estimate_memory(rows);
+    }
+
+    if (_data_type->is_nullable()) {
+        estimate_size += rows * sizeof(uint8_t);
+    }
+
+    estimate_size += rows * sizeof(uint8_t);
+
+    return estimate_size;
+}
+
 const std::string& VInPredicate::expr_name() const {
     return _expr_name;
 }
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 024ad68f2ba..10caa3e7543 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -43,6 +43,7 @@ public:
     VInPredicate(const TExprNode& node);
     ~VInPredicate() override = default;
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
+    size_t estimate_memory(const size_t rows) override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 2bea6ea5c06..f9165ee1e35 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -54,6 +54,8 @@ public:
 
     bool equals(const VExpr& other) override;
 
+    size_t estimate_memory(const size_t rows) override { return 0; }
+
 private:
     int _slot_id;
     int _column_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to