This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 3ba5406e00a [fix] Use separage memory sufficient depenedency for each PipelineTask (#42198) 3ba5406e00a is described below commit 3ba5406e00a353e342d936f5d3b077531bd515fd Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Oct 21 17:41:26 2024 +0800 [fix] Use separage memory sufficient depenedency for each PipelineTask (#42198) ## Proposed changes 1. [opt] Limit the number of scanners in FileScanOperator 2. [fix] avoid finishing spilling streams repeatly in partitioned join 3. [fix] Query blocking issue caused by pending block in PipelineTask --- be/src/pipeline/exec/file_scan_operator.cpp | 15 ++++- .../exec/partitioned_hash_join_sink_operator.cpp | 5 ++ .../exec/partitioned_hash_join_sink_operator.h | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++ be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/pipeline/pipeline_task.cpp | 70 ++++++++++------------ be/src/pipeline/pipeline_task.h | 6 +- be/src/runtime/query_context.cpp | 21 +++---- be/src/runtime/query_context.h | 5 -- .../workload_group/workload_group_manager.cpp | 3 +- 10 files changed, 82 insertions(+), 59 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 6fa7401e278..7018c279d35 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -60,9 +60,20 @@ std::string FileScanLocalState::name_suffix() const { void FileScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) { + auto wg_ptr = state->get_query_ctx()->workload_group(); _max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - _max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); + if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) { + const auto total_slots = wg_ptr->total_query_slot_count(); + const auto query_slots = state->get_query_ctx()->get_slot_count(); + _max_scanners = _max_scanners * query_slots / total_slots; + } + + const auto parallel_scan_max_scanners_count = state->parallel_scan_max_scanners_count(); + if (parallel_scan_max_scanners_count > 0) { + _max_scanners = + std::max(std::min(_max_scanners, state->parallel_scan_max_scanners_count()), 1); + } // For select * from table limit 10; should just use one thread. if (should_run_serial()) { _max_scanners = 1; @@ -82,7 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, _max_scanners); } _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); - if (scan_ranges.size() > 0 && + if (!scan_ranges.empty() && scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { // for compatibility. // in new implement, the tuple id is set in prepare phase diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d3d010e0d7c..baa99d3fe14 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -412,6 +412,11 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( } Status PartitionedHashJoinSinkLocalState::_finish_spilling() { + bool expected = false; + if (!_spilling_finished.compare_exchange_strong(expected, true)) { + return Status::OK(); + } + for (auto& stream : _shared_state->spilled_streams) { if (stream) { RETURN_IF_ERROR(stream->spill_eof()); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 8a844e69963..97c40d43a66 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -67,6 +67,7 @@ protected: friend class PartitionedHashJoinSinkOperatorX; + std::atomic<bool> _spilling_finished {false}; vectorized::Block _pending_block; bool _child_eos {false}; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 6e6c2bd3e73..03db7e674f1 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1872,6 +1872,19 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const return revocable_tasks; } +void PipelineFragmentContext::set_memory_sufficient(bool sufficient) { + for (const auto& task_instances : _tasks) { + for (const auto& task : task_instances) { + auto* dependency = task->get_memory_sufficient_dependency(); + if (sufficient) { + dependency->set_ready(); + } else { + dependency->block(); + } + } + } +} + std::string PipelineFragmentContext::debug_string() { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n"); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 9c2ed36b919..8d55d0ce285 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -121,6 +121,8 @@ public: [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; + void set_memory_sufficient(bool sufficient); + void instance_ids(std::vector<TUniqueId>& ins_ids) const { ins_ids.resize(_fragment_instance_ids.size()); for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index b3282810d86..affb8c44382 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -17,6 +17,7 @@ #include "pipeline_task.h" +#include <fmt/core.h> #include <fmt/format.h> #include <gen_cpp/Metrics_types.h> #include <glog/logging.h> @@ -71,15 +72,18 @@ PipelineTask::PipelineTask( _sink(pipeline->sink_shared_pointer()), _le_state_map(std::move(le_state_map)), _task_idx(task_idx), - _execution_dep(state->get_query_ctx()->get_execution_dependency()), - _memory_sufficient_dependency( - state->get_query_ctx()->get_memory_sufficient_dependency()) { + _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); auto shared_state = _sink->create_shared_state(); if (shared_state) { _sink_shared_state = shared_state; } + + const auto dependency_name = + fmt::format("MemorySufficientDependency_{}_{}", _sink->node_id(), task_id); + _memory_sufficient_dependency = + pipeline::Dependency::create_unique(-1, -1, dependency_name, true); } Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, @@ -317,6 +321,12 @@ Status PipelineTask::execute(bool* eos) { SCOPED_ATTACH_TASK(_state); _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; *eos = _eos; + + // If `_wake_up_by_downstream` is true, the pending block will not be sank. + if (_wake_up_by_downstream) { + _pending_block.reset(); + } + if (_eos && !_pending_block) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. return Status::OK(); @@ -388,26 +398,22 @@ Status PipelineTask::execute(bool* eos) { // Every loop should check if memory is not enough. // _state->get_query_ctx()->update_low_memory_mode(); - // `_dry_run` means sink operator need no more data - // `_sink->is_finished(_state)` means sink operator should be finished - int64_t reserve_size = 0; - bool has_enough_memory = true; - if (_dry_run || _sink->is_finished(_state)) { - *eos = true; - _eos = true; - } else if (_pending_block) [[unlikely]] { + if (_pending_block) [[unlikely]] { LOG(INFO) << "query: " << print_id(query_id) << " has pending block, size: " << _pending_block->allocated_bytes(); _block = std::move(_pending_block); block = _block.get(); + } + // `_dry_run` means sink operator need no more data + // `_sink->is_finished(_state)` means sink operator should be finished + else if (_dry_run || _sink->is_finished(_state)) { + *eos = true; + _eos = true; } else { SCOPED_TIMER(_get_block_timer); DEFER_RELEASE_RESERVED(); _get_block_counter->update(1); - // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state); - // sink_reserve_size = - // std::max(sink_reserve_size, _state->minimum_operator_memory_required_bytes()); - reserve_size = _root->get_reserve_mem_size(_state); + const auto reserve_size = _root->get_reserve_mem_size(_state); _root->reset_reserve_mem_size(_state); auto workload_group = _state->get_query_ctx()->workload_group(); @@ -426,19 +432,14 @@ Status PipelineTask::execute(bool* eos) { << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); _state->get_query_ctx()->update_paused_reason(st); - // _state->get_query_ctx()->set_low_memory_mode(); - bool is_high_wartermark = false; - bool is_low_wartermark = false; - workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); - if (is_low_wartermark || is_high_wartermark) { - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), reserve_size); - continue; - } - has_enough_memory = false; + _state->get_query_ctx()->set_low_memory_mode(); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this(), reserve_size); + continue; } } + DCHECK_EQ(_pending_block.get(), nullptr); RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } @@ -447,9 +448,10 @@ Status PipelineTask::execute(bool* eos) { Status status = Status::OK(); DEFER_RELEASE_RESERVED(); COUNTER_UPDATE(_memory_reserve_times, 1); - size_t sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); + const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); status = thread_context()->try_reserve_memory(sink_reserve_size); if (!status.ok()) { + COUNTER_UPDATE(_memory_reserve_failed_times, 1); LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) << ", sink name: " << _sink->get_name() @@ -457,11 +459,12 @@ Status PipelineTask::execute(bool* eos) { << ", failed: " << status.to_string() << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); _state->get_query_ctx()->update_paused_reason(status); - _memory_sufficient_dependency->block(); + _state->get_query_ctx()->set_low_memory_mode(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), sink_reserve_size); + DCHECK_EQ(_pending_block.get(), nullptr); _pending_block = std::move(_block); - _block = vectorized::Block::create_unique(); + _block = vectorized::Block::create_unique(_pending_block->clone_empty()); _eos = *eos; *eos = false; continue; @@ -484,17 +487,6 @@ Status PipelineTask::execute(bool* eos) { return Status::OK(); } } - - if (!has_enough_memory) { - COUNTER_UPDATE(_yield_counts, 1); - - LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this - << ", insufficient memory. reserve_size: " - << PrettyPrinter::print(reserve_size, TUnit::BYTES); - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), reserve_size); - break; - } } static_cast<void>(get_task_queue()->push_back(this)); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 44dfdd7832a..a3505f7a407 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -244,6 +244,10 @@ public: _spill_dependencies.emplace_back(dependency); } + Dependency* get_memory_sufficient_dependency() const { + return _memory_sufficient_dependency.get(); + } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -325,7 +329,7 @@ private: Dependency* _execution_dep = nullptr; std::atomic<bool> _wake_up_by_downstream = false; - Dependency* _memory_sufficient_dependency = nullptr; + std::unique_ptr<Dependency> _memory_sufficient_dependency; std::atomic<bool> _finalized {false}; std::mutex _dependency_lock; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 527c7ca684d..a1c89394c7b 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -92,8 +92,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, _query_watcher.start(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency"); - _memory_sufficient_dependency = - pipeline::Dependency::create_unique(-1, -1, "MemorySufficientDependency", true); _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>( TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker); @@ -203,7 +201,6 @@ QueryContext::~QueryContext() { } _runtime_filter_mgr.reset(); _execution_dependency.reset(); - _memory_sufficient_dependency.reset(); _shared_hash_table_controller.reset(); _runtime_predicates.clear(); file_scan_range_params_map.clear(); @@ -239,12 +236,18 @@ void QueryContext::set_memory_sufficient(bool sufficient) { _paused_timer.stop(); _paused_period_secs += _paused_timer.elapsed_time() / (1000L * 1000L * 1000L); } - _memory_sufficient_dependency->set_ready(); } else { - _memory_sufficient_dependency->block(); _paused_timer.start(); ++_paused_count; } + + for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { + auto fragment_ctx = fragment_wptr.lock(); + if (!fragment_ctx) { + continue; + } + fragment_ctx->set_memory_sufficient(sufficient); + } } void QueryContext::cancel(Status new_status, int fragment_id) { @@ -530,15 +533,13 @@ std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const { std::string QueryContext::debug_string() { std::lock_guard l(_paused_mutex); return fmt::format( - "QueryId={}, Memory [Used={}, Limit={}, Peak={}], " - "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, " - "MemorySufficient={}, LatestPausedReason={}]", + "QueryId={}, Memory [Used={}, Limit={}, Peak={}], Spill[RunningSpillTaskCnt={}, " + "TotalPausedPeriodSecs={}, LatestPausedReason={}]", print_id(_query_id), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES), - _revoking_tasks_count, _paused_period_secs, _memory_sufficient_dependency->ready(), - _paused_reason.to_string()); + _revoking_tasks_count, _paused_period_secs, _paused_reason.to_string()); } std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 9dd75cb340d..f16bd0fcf95 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -184,10 +184,6 @@ public: pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } - pipeline::Dependency* get_memory_sufficient_dependency() { - return _memory_sufficient_dependency.get(); - } - std::vector<pipeline::PipelineTask*> get_revocable_tasks() const; Status revoke_memory(); @@ -402,7 +398,6 @@ private: vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr<pipeline::Dependency> _execution_dependency; - std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency; std::vector<std::weak_ptr<pipeline::PipelineTask>> _pipeline_tasks; std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index c2c51a35429..b42aeeb1b43 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -692,8 +692,7 @@ void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit); } } - LOG(INFO) << debug_msg; - //LOG_EVERY_T(INFO, 60) << debug_msg; + LOG_EVERY_T(INFO, 60) << debug_msg; } void WorkloadGroupMgr::stop() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org