This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 9bf2285c008 [opt](spill) Optimize the logic for triggering spilling (#46699) 9bf2285c008 is described below commit 9bf2285c0084d6a9412a0444856f22362d6c62a5 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Fri Jan 10 16:01:36 2025 +0800 [opt](spill) Optimize the logic for triggering spilling (#46699) --- be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++------ be/src/pipeline/pipeline_task.cpp | 38 +++++++++++++++------------ be/src/vec/exec/scan/scanner_scheduler.cpp | 3 +-- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7cf1a963105..09a14c66a7f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1279,11 +1279,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo /// If `group_by_limit_opt` is true, then it might not need to spill at all. const bool enable_spill = _runtime_state->enable_spill() && !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt; - - if (tnode.agg_node.aggregate_functions.empty() && !enable_spill && - request.query_options.__isset.enable_distinct_streaming_aggregation && - request.query_options.enable_distinct_streaming_aggregation && - !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { + const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation && + tnode.agg_node.use_streaming_preaggregation && + !tnode.agg_node.grouping_exprs.empty(); + const bool can_use_distinct_streaming_agg = + is_streaming_agg && tnode.agg_node.aggregate_functions.empty() && + request.query_options.__isset.enable_distinct_streaming_aggregation && + request.query_options.enable_distinct_streaming_aggregation; + + if (can_use_distinct_streaming_agg) { if (enable_query_cache) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); @@ -1305,9 +1309,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); } - } else if (tnode.agg_node.__isset.use_streaming_preaggregation && - tnode.agg_node.use_streaming_preaggregation && - !tnode.agg_node.grouping_exprs.empty()) { + } else if (is_streaming_agg) { if (enable_query_cache) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 960b5a813ce..ed40731bfd1 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -426,11 +426,8 @@ Status PipelineTask::execute(bool* eos) { debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str()); } - LOG(INFO) << debug_msg; - - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), reserve_size, st); - continue; + LOG_EVERY_N(INFO, 100) << debug_msg; + _state->get_query_ctx()->set_low_memory_mode(); } } @@ -443,11 +440,13 @@ Status PipelineTask::execute(bool* eos) { Status status = Status::OK(); DEFER_RELEASE_RESERVED(); COUNTER_UPDATE(_memory_reserve_times, 1); - const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); auto workload_group = _state->get_query_ctx()->workload_group(); if (_state->enable_reserve_memory() && workload_group && !(wake_up_early() || _dry_run)) { - status = thread_context()->try_reserve_memory(sink_reserve_size); + const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); + status = sink_reserve_size != 0 + ? thread_context()->try_reserve_memory(sink_reserve_size) + : Status::OK(); if (status.ok() && _state->enable_force_spill() && _sink->is_spillable() && _sink->revocable_mem_size(_state) >= @@ -468,16 +467,21 @@ Status PipelineTask::execute(bool* eos) { debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str()); } - VLOG_DEBUG << debug_msg; - - DCHECK_EQ(_pending_block.get(), nullptr); - _pending_block = std::move(_block); - _block = vectorized::Block::create_unique(_pending_block->clone_empty()); - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), sink_reserve_size, status); - _pending_eos = *eos; - *eos = false; - continue; + + if (_sink->revocable_mem_size(_state) >= _state->spill_min_revocable_mem()) { + VLOG_DEBUG << debug_msg; + DCHECK_EQ(_pending_block.get(), nullptr); + _pending_block = std::move(_block); + _block = vectorized::Block::create_unique(_pending_block->clone_empty()); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this(), sink_reserve_size, + status); + _pending_eos = *eos; + *eos = false; + continue; + } else { + _state->get_query_ctx()->set_low_memory_mode(); + } } } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index cd7f6cfb0ed..8a2387cc69e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -207,8 +207,7 @@ void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerC } LOG(INFO) << debug_msg; - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - state->get_query_ctx()->shared_from_this(), reserve_size, st); + state->get_query_ctx()->set_low_memory_mode(); } void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org