This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 9bf2285c008 [opt](spill) Optimize the logic for triggering spilling 
(#46699)
9bf2285c008 is described below

commit 9bf2285c0084d6a9412a0444856f22362d6c62a5
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Fri Jan 10 16:01:36 2025 +0800

    [opt](spill) Optimize the logic for triggering spilling (#46699)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++------
 be/src/pipeline/pipeline_task.cpp             | 38 +++++++++++++++------------
 be/src/vec/exec/scan/scanner_scheduler.cpp    |  3 +--
 3 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7cf1a963105..09a14c66a7f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1279,11 +1279,15 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         /// If `group_by_limit_opt` is true, then it might not need to spill 
at all.
         const bool enable_spill = _runtime_state->enable_spill() &&
                                   !tnode.agg_node.grouping_exprs.empty() && 
!group_by_limit_opt;
-
-        if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
-            
request.query_options.__isset.enable_distinct_streaming_aggregation &&
-            request.query_options.enable_distinct_streaming_aggregation &&
-            !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
+        const bool is_streaming_agg = 
tnode.agg_node.__isset.use_streaming_preaggregation &&
+                                      
tnode.agg_node.use_streaming_preaggregation &&
+                                      !tnode.agg_node.grouping_exprs.empty();
+        const bool can_use_distinct_streaming_agg =
+                is_streaming_agg && tnode.agg_node.aggregate_functions.empty() 
&&
+                
request.query_options.__isset.enable_distinct_streaming_aggregation &&
+                request.query_options.enable_distinct_streaming_aggregation;
+
+        if (can_use_distinct_streaming_agg) {
             if (enable_query_cache) {
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
@@ -1305,9 +1309,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 RETURN_IF_ERROR(cur_pipe->add_operator(
                         op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
-        } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
-                   tnode.agg_node.use_streaming_preaggregation &&
-                   !tnode.agg_node.grouping_exprs.empty()) {
+        } else if (is_streaming_agg) {
             if (enable_query_cache) {
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 960b5a813ce..ed40731bfd1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,11 +426,8 @@ Status PipelineTask::execute(bool* eos) {
                         debug_msg += fmt::format(", debug info: {}",
                                                  
GlobalMemoryArbitrator::process_mem_log_str());
                     }
-                    LOG(INFO) << debug_msg;
-
-                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                            _state->get_query_ctx()->shared_from_this(), 
reserve_size, st);
-                    continue;
+                    LOG_EVERY_N(INFO, 100) << debug_msg;
+                    _state->get_query_ctx()->set_low_memory_mode();
                 }
             }
 
@@ -443,11 +440,13 @@ Status PipelineTask::execute(bool* eos) {
             Status status = Status::OK();
             DEFER_RELEASE_RESERVED();
             COUNTER_UPDATE(_memory_reserve_times, 1);
-            const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
             auto workload_group = _state->get_query_ctx()->workload_group();
             if (_state->enable_reserve_memory() && workload_group &&
                 !(wake_up_early() || _dry_run)) {
-                status = 
thread_context()->try_reserve_memory(sink_reserve_size);
+                const auto sink_reserve_size = 
_sink->get_reserve_mem_size(_state, *eos);
+                status = sink_reserve_size != 0
+                                 ? 
thread_context()->try_reserve_memory(sink_reserve_size)
+                                 : Status::OK();
 
                 if (status.ok() && _state->enable_force_spill() && 
_sink->is_spillable() &&
                     _sink->revocable_mem_size(_state) >=
@@ -468,16 +467,21 @@ Status PipelineTask::execute(bool* eos) {
                         debug_msg += fmt::format(", debug info: {}",
                                                  
GlobalMemoryArbitrator::process_mem_log_str());
                     }
-                    VLOG_DEBUG << debug_msg;
-
-                    DCHECK_EQ(_pending_block.get(), nullptr);
-                    _pending_block = std::move(_block);
-                    _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
-                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                            _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size, status);
-                    _pending_eos = *eos;
-                    *eos = false;
-                    continue;
+
+                    if (_sink->revocable_mem_size(_state) >= 
_state->spill_min_revocable_mem()) {
+                        VLOG_DEBUG << debug_msg;
+                        DCHECK_EQ(_pending_block.get(), nullptr);
+                        _pending_block = std::move(_block);
+                        _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
+                        
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                                _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size,
+                                status);
+                        _pending_eos = *eos;
+                        *eos = false;
+                        continue;
+                    } else {
+                        _state->get_query_ctx()->set_low_memory_mode();
+                    }
                 }
             }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index cd7f6cfb0ed..8a2387cc69e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -207,8 +207,7 @@ void handle_reserve_memory_failure(RuntimeState* state, 
std::shared_ptr<ScannerC
     }
     LOG(INFO) << debug_msg;
 
-    ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-            state->get_query_ctx()->shared_from_this(), reserve_size, st);
+    state->get_query_ctx()->set_low_memory_mode();
 }
 
 void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to