github-actions[bot] commented on code in PR #44469:
URL: https://github.com/apache/doris/pull/44469#discussion_r1876218993


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -29,65 +46,306 @@
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    int* un_finish_copy = nullptr;
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:48:** 97 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   
   </details>
   



##########
be/src/common/config.cpp:
##########
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <fmt/core.h>

Review Comment:
   warning: 'fmt/core.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <fmt/core.h>
            ^
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -29,65 +46,306 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, 
int un_finish_copy, siz
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    int* un_finish_copy = nullptr;
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' has cognitive complexity of 54 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:54:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_cached_blocks[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:69:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_spill_readers[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:71:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (!reader_item->stream->ready_for_reading()) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:76:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:76:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:77:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (reader_item->block_offset != 0) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:82:** nesting level 
increased to 2
   ```cpp
               auto spill_func = [this, reader_item, sender_idx]() {
                                 ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:86:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   while (!spill_eos) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:87:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:87:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:88:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (!block.empty()) {
                       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:92:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           if (_cached_blocks[sender_idx].size() >= 32 ||
                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:99:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (spill_eos || !_cached_blocks[sender_idx].empty()) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:106:** nesting level 
increased to 2
   ```cpp
               auto catch_exception_func = [spill_func = 
std::move(spill_func)]() {
                                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:116:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:116:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:122:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:139:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +115,83 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' has cognitive complexity of 
63 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:160:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:164:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:173:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:178:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.rows() <= 1) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:181:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (spill_context) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:187:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.columns() > num_slots) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:192:** 
nesting level increased to 1
   ```cpp
       auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:200:** 
nesting level increased to 2
   ```cpp
                         [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:204:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (offset < total_rows) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:208:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != build_block.columns(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:226:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != sub_block.rows(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:230:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (uint32_t partition_idx = 0; partition_idx != 
p._partition_count; ++partition_idx) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:236:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (UNLIKELY(!partition_block)) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:244:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:244:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:249:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:249:** +1
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                                                                ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:251:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:251:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:255:** +1, 
nesting level increased to 4
   ```cpp
                   } else {
                     ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:264:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [spill_func]() mutable {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:276:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:276:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_source_operator.cpp:
##########
@@ -75,94 +73,85 @@
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
     auto& parent = Base::_parent->template cast<Parent>();
-    VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
+    VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
                << " merge spill data";
-    _dependency->Dependency::block();
+    _spill_dependency->Dependency::block();
 
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto spill_func = [this, state, query_id, &parent, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [this, state, query_id, &parent] {

Review Comment:
   warning: lambda has cognitive complexity of 52 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:102:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           while (!state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:109:** nesting level 
increased to 2
   ```cpp
                   SCOPED_TIMER(Base::_spill_recover_time);
                   ^
   ```
   **be/src/util/runtime_profile.h:68:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:117:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (_shared_state->sorted_streams.empty()) {
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +1
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:134:** nesting level 
increased to 3
   ```cpp
                           SCOPED_TIMER(Base::_spill_recover_time);
                           ^
   ```
   **be/src/util/runtime_profile.h:68:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:140:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                           if (status.ok()) {
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:146:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       if (status.ok()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -297,7 +266,587 @@
 
         SchemaScannerHelper::insert_int64_value(4, 
wg->get_local_scan_bytes_per_second(), block);
         SchemaScannerHelper::insert_int64_value(5, 
wg->get_remote_scan_bytes_per_second(), block);
+        SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), 
block);
+    }
+}
+
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size) {
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    DCHECK(query_ctx != nullptr);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        query_ctx->set_memory_sufficient(false);
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        const auto& wg = it->first;
+
+        LOG_EVERY_T(INFO, 120) << "Paused queries count: " << 
queries_list.size();
+
+        bool is_low_watermark = false;
+        bool is_high_watermark = false;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->expected_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->expected_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes = 
flush_memtable_from_current_group_(
+                            query_ctx, wg, query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory. wg: "
+                              << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.001 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.001) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                if (query_it->cache_ratio_ < 0.001) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes = 
flush_memtable_from_current_group_(
+                                query_ctx, wg, query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.001 &&
+                    query_it->cache_ratio_ > 0.001) {
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                ++query_it;
+            }
+        }
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
+            wg->enable_write_buffer_limit(false);
+        }
+
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(
+        std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t 
need_free_mem) {
+    // If there are a lot of memtable memory, then wait them flush finished.
+    MemTableMemoryLimiter* memtable_limiter =
+            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    memtable_limiter->get_workload_group_memtable_usage(
+            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    // TODO: should add a signal in memtable limiter to prevent new batch
+    // For example, streamload, it will not reserve many memory, but it will 
occupy many memtable memory.
+    // TODO: 0.2 should be a workload group properties. For example, the group 
is optimized for load,then the value
+    // should be larged, if the group is optimized for query, then the value 
should be smaller.
+    int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
+    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+        max_wg_memtable_bytes) {
+        // There are many table in flush queue, just waiting them flush 
finished.
+        if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                  << " wait for flush finished to release more 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes;
+        } else {
+            // Flush some memtables(currently written) to flush queue.
+            memtable_limiter->flush_workload_group_memtables(
+                    wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", flush some active memtable to revoke 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
+                   (int64_t)(max_wg_memtable_bytes * 0.6);
+        }
+    }
+    return 0;
+}
+
+int64_t 
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,
+                                                          bool hard_limit, 
int64_t need_free_mem) {
+    int64_t total_freed_mem = 0;
+    std::unique_ptr<RuntimeProfile> profile = 
std::make_unique<RuntimeProfile>("RevokeMemory");
+    // 1. memtable like memory
+    // 2. query exceed workload group limit
+    int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, 
profile.get());
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    if (hard_limit) {
+        freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - 
total_freed_mem,
+                                                          
doris::QUERY_MIN_MEMORY, profile.get());
+    } else {
+        freed_mem = cancel_top_query_in_overcommit_group_(
+                need_free_mem - total_freed_mem, 
requestor->get_mem_tracker()->consumption(),
+                profile.get());
+    }
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    return total_freed_mem;
+}
+
+// Revoke memory from workload group that exceed it's limit. For example, if 
the wg's limit is 10g, but used 12g
+// then should revoke 2g from the group.
+int64_t 
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> 
requestor,
+                                                      int64_t need_free_mem,
+                                                      RuntimeProfile* profile) 
{
+    int64_t total_freed_mem = 0;
+    // 1. check memtable usage, and try to free them.
+    int64_t freed_mem = 
revoke_memtable_from_overcommited_groups_(need_free_mem, profile);
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    // 2. Cancel top usage query, one by one
+    using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
+    auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
+        return left.second < right.second;
+    };
+    std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, 
decltype(cmp)> heap(cmp);
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
+            if (requestor->workload_group() != nullptr &&
+                iter->second->id() == requestor->workload_group()->id()) {
+                continue;
+            }
+            heap.emplace(iter->second, iter->second->memory_used());
+        }
+    }
+    while (!heap.empty() && need_free_mem - total_freed_mem > 0 && 
!requestor->is_cancelled()) {
+        auto [wg, sort_mem] = heap.top();
+        heap.pop();
+        freed_mem = wg->free_overcommited_memory(need_free_mem - 
total_freed_mem, profile);
+        total_freed_mem += freed_mem;
+    }
+    return total_freed_mem;
+}
+
+// If the memtable is too large, then flush them and wait for finished.
+int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t 
need_free_mem,
+                                                                    
RuntimeProfile* profile) {
+    return 0;
+}
+
+// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage 
task that with most memory.
+// 2. Maybe not valid because it's memory not exceed limit.
+int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t 
need_free_mem,
+                                                                int64_t 
lower_bound,
+                                                                
RuntimeProfile* profile) {
+    return 0;
+}
+
+// streamload, kafka routine load, group commit
+// insert into select
+// select
+
+// If the query could release some memory, for example, spill disk, then the 
return value is true.
+// If the query could not release memory, then cancel the query, the return 
value is true.
+// If the query is not ready to do these tasks, it means just wait, then 
return value is false.
+bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>& query_ctx,

Review Comment:
   warning: function 'handle_single_query_' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>& query_ctx,
                          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/workload_group/workload_group_manager.cpp:645:** 102 lines 
including whitespace and comments (threshold 80)
   ```cpp
   bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>& query_ctx,
                          ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp:
##########
@@ -228,95 +251,108 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
     return sink_local_state->open(state);
 }
 
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
-    VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
-               << Base::_parent->node_id() << " revoke_memory"
+size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
+    auto& local_state = get_local_state(state);
+    auto* runtime_state = local_state._runtime_state.get();
+    auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos);
+    COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
+    return size;
+}
+
+Status PartitionedAggSinkLocalState::revoke_memory(

Review Comment:
   warning: function 'revoke_memory' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status PartitionedAggSinkLocalState::revoke_memory(
                                        ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp:261:** 88 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedAggSinkLocalState::revoke_memory(
                                        ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:
##########
@@ -190,109 +237,112 @@
     return source_local_state->open(state);
 }
 
-Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(RuntimeState* 
state) {
-    DCHECK(!_is_merging);
-    
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-    if 
(Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator !=
-                
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() ||
-        _shared_state->spill_partitions.empty()) {
+Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, 
bool& has_data) {

Review Comment:
   warning: function 'recover_blocks_from_disk' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* 
state, bool& has_data) {
                                    ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:239:** 
107 lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* 
state, bool& has_data) {
                                    ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_sink_operator.cpp:
##########
@@ -164,88 +174,82 @@
     if (eos) {
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
-                RETURN_IF_ERROR(revoke_memory(state));
+                RETURN_IF_ERROR(revoke_memory(state, nullptr));
             } else {
                 local_state._dependency->set_ready_to_read();
-                local_state._finish_dependency->set_ready();
             }
         } else {
             RETURN_IF_ERROR(
                     
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
             local_state._dependency->set_ready_to_read();
-            local_state._finish_dependency->set_ready();
         }
     }
     return Status::OK();
 }
 
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    return 
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+                                                                          eos);
+}
+
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,

Review Comment:
   warning: function 'revoke_memory' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:195:** 112 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -479,11 +520,11 @@
 Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,

Review Comment:
   warning: function 'sink' has cognitive complexity of 76 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:525:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!local_state._shared_state->_spill_status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:534:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (rows == 0) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:535:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:540:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (need_to_spill) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:542:** +1, 
nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:543:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if 
(UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(_setup_internal_operator(state));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(_setup_internal_operator(state));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:546:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
                   ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:546:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
                   ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:552:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(_inner_sink_operator->sink(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:552:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(_inner_sink_operator->sink(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:563:** 
nesting level increased to 3
   ```cpp
                             [&](auto& block) {
                             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:564:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                                 if (block) {
                                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:574:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (need_to_spill) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:575:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:575:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:576:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:578:** +1, 
nesting level increased to 2
   ```cpp
           } else if (revocable_mem_size(state) > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
                  ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:581:** +1, 
nesting level increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:582:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_setup_internal_operator(state));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_setup_internal_operator(state));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:585:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:585:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:591:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_inner_sink_operator->sink(
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:591:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_inner_sink_operator->sink(
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:594:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_source_operator.cpp:
##########
@@ -75,94 +73,85 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() 
const {
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {

Review Comment:
   warning: function 'initiate_merge_sort_spill_streams' has cognitive 
complexity of 84 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
                               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:81:** nesting level 
increased to 1
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:102:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:109:** nesting level 
increased to 3
   ```cpp
                   SCOPED_TIMER(Base::_spill_recover_time);
                   ^
   ```
   **be/src/util/runtime_profile.h:68:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:117:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (_shared_state->sorted_streams.empty()) {
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +1
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:134:** nesting level 
increased to 4
   ```cpp
                           SCOPED_TIMER(Base::_spill_recover_time);
                           ^
   ```
   **be/src/util/runtime_profile.h:68:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:140:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           if (status.ok()) {
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:146:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (status.ok()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:164:** nesting level 
increased to 1
   ```cpp
       auto exception_catch_func = [spill_func]() {
                                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** nesting level 
increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:169:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:169:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:
##########
@@ -190,109 +237,112 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
     return source_local_state->open(state);
 }
 
-Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(RuntimeState* 
state) {
-    DCHECK(!_is_merging);
-    
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-    if 
(Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator !=
-                
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() ||
-        _shared_state->spill_partitions.empty()) {
+Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, 
bool& has_data) {

Review Comment:
   warning: function 'recover_blocks_from_disk' has cognitive complexity of 76 
(threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* 
state, bool& has_data) {
                                    ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:242:** 
+1, including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_shared_state->spill_partitions.empty()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:249:** 
nesting level increased to 1
   ```cpp
       auto spill_func = [this, state, query_id] {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:262:** 
+2, including nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!state->is_cancelled() && !has_agg_data &&
           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:264:** 
+3, including nesting penalty of 2, nesting level increased to 3
   ```cpp
               while 
(!_shared_state->spill_partitions[0]->spill_streams_.empty() &&
               ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:270:** 
+4, including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:270:** +1
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                               ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:272:** 
+5, including nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:272:** 
+6, including nesting penalty of 5, nesting level increased to 6
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:278:** 
+5, including nesting penalty of 4, nesting level increased to 5
   ```cpp
                           if (status.ok()) {
                           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:282:** 
+5, including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:282:** 
+6, including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:284:** 
+5, including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       if (!block.empty()) {
                       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:289:** 
+6, including nesting penalty of 5, nesting level increased to 6
   ```cpp
                           if (accumulated_blocks_size >=
                           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:298:** 
+4, including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (_current_partition_eos) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:304:** 
+3, including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (_shared_state->spill_partitions[0]->spill_streams_.empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:317:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [spill_func, query_id]() {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:318:** 
+2, including nesting penalty of 1, nesting level increased to 2
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:318:** 
+3, including nesting penalty of 2, nesting level increased to 3
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:326:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:326:** 
+3, including nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:326:** 
+4, including nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:326:** 
+5, including nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:332:** 
+1, including nesting penalty of 0, nesting level increased to 1
   ```cpp
       DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:332:** 
+2, including nesting penalty of 1, nesting level increased to 2
   ```cpp
       DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +115,83 @@
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:156:** 126 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_sink_operator.cpp:
##########
@@ -164,88 +174,82 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     if (eos) {
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
-                RETURN_IF_ERROR(revoke_memory(state));
+                RETURN_IF_ERROR(revoke_memory(state, nullptr));
             } else {
                 local_state._dependency->set_ready_to_read();
-                local_state._finish_dependency->set_ready();
             }
         } else {
             RETURN_IF_ERROR(
                     
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
             local_state._dependency->set_ready_to_read();
-            local_state._finish_dependency->set_ready();
         }
     }
     return Status::OK();
 }
 
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    return 
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+                                                                          eos);
+}
+
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,

Review Comment:
   warning: function 'revoke_memory' has cognitive complexity of 52 (threshold 
50) [readability-function-cognitive-complexity]
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:197:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_shared_state->is_spilled) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:205:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_shared_state->_spill_status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:213:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(status);
       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:213:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(status);
       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:220:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_eos) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:225:** nesting level 
increased to 1
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:251:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(status);
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:251:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(status);
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:258:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!eos && !state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:258:** +1
   ```cpp
           while (!eos && !state->is_cancelled()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:265:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:265:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:267:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:267:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:275:** nesting level 
increased to 1
   ```cpp
       auto exception_catch_func = [query_id, spill_func]() {
                                   ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:276:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:276:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** nesting level 
increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:289:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:289:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:294:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:302:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:303:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!_eos) {
           ^
   ```
   
   </details>
   



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -297,7 +266,587 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
 
         SchemaScannerHelper::insert_int64_value(4, 
wg->get_local_scan_bytes_per_second(), block);
         SchemaScannerHelper::insert_int64_value(5, 
wg->get_remote_scan_bytes_per_second(), block);
+        SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), 
block);
+    }
+}
+
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size) {
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    DCHECK(query_ctx != nullptr);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        query_ctx->set_memory_sufficient(false);
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {

Review Comment:
   warning: function 'handle_paused_queries' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   void WorkloadGroupMgr::handle_paused_queries() {
                          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/workload_group/workload_group_manager.cpp:296:** 219 lines 
including whitespace and comments (threshold 80)
   ```cpp
   void WorkloadGroupMgr::handle_paused_queries() {
                          ^
   ```
   
   </details>
   



##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -87,31 +94,58 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
tg_info, bool need_create_
 
 std::string WorkloadGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
+    auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
+    auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
-            "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
+            "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
+            "total_query_slot_count = {}, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
+            "enable_memory_overcommit = {}, total_mem_used = {} 
(write_buffer_size={}),"
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
cpu_hard_limit = {}, "
+            "scan_thread_num = "
             "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
             "memory_low_watermark={}, memory_high_watermark={}, 
is_shutdown={}, query_num={}, "
             "read_bytes_per_second={}, remote_read_bytes_per_second={}]",
-            _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
-            _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
-            _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num,
-            _memory_low_watermark, _memory_high_watermark, _is_shutdown, 
_query_ctxs.size(),
-            _scan_bytes_per_second, _remote_scan_bytes_per_second);
+            _id, _name, _version, cpu_share(), _total_query_slot_count,
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
+            _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
+            PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+            PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
+            PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
+            mem_used_ratio, cpu_hard_limit(), _scan_thread_num, 
_max_remote_scan_thread_num,
+            _min_remote_scan_thread_num, _memory_low_watermark, 
_memory_high_watermark,
+            _is_shutdown, _query_ctxs.size(), _scan_bytes_per_second,
+            _remote_scan_bytes_per_second);
+}
+
+bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {

Review Comment:
   warning: method 'add_wg_refresh_interval_memory_growth' can be made const 
[readability-make-member-function-const]
   
   be/src/runtime/workload_group/workload_group.h:115:
   ```diff
   -     bool add_wg_refresh_interval_memory_growth(int64_t size);
   +     bool add_wg_refresh_interval_memory_growth(int64_t size) const;
   ```
   
   ```suggestion
   bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) 
const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to