This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit dfdb36cee59a0b071d8ee7a75cb14c24d886a2f0 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Sep 4 17:42:40 2024 +0800 updated --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 10 ++++ be/src/pipeline/exec/hashjoin_build_sink.h | 2 + .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 7 ++- .../exec/partitioned_hash_join_sink_operator.cpp | 5 ++ be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 53 +++++++++++++--------- be/src/pipeline/task_scheduler.cpp | 42 +++++++++-------- be/src/pipeline/task_scheduler.h | 29 +++++++++++- 9 files changed, 106 insertions(+), 46 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 589af5e1f42..9bf6c422af4 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -114,6 +114,11 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { if (!_should_build_hash_table) { return 0; } + + if (_shared_state->build_block) { + return 0; + } + size_t size_to_reserve = 0; if (!_build_side_mutable_block.empty()) { @@ -660,4 +665,9 @@ size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { return local_state.get_reserve_mem_size(state); } +size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return local_state._mem_tracker->consumption(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cb626a81cb1..9c95d0c10d6 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -128,6 +128,8 @@ public: size_t get_reserve_mem_size(RuntimeState* state) override; + [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const; + bool should_dry_run(RuntimeState* state) override { return _is_broadcast_join && !state->get_sink_local_state() ->cast<HashJoinBuildSinkLocalState>() diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 1a94d9077be..4a6abb32ed4 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -331,6 +331,6 @@ private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; - size_t _spill_partition_count_bits = 4; + size_t _spill_partition_count_bits = 6; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 9a939c62287..d91b424440a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -343,9 +343,10 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); shared_state_sptr->spilled_streams[partition_index].reset(); + const size_t rows = mutable_block ? mutable_block->rows() : 0; VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", recovery build data done"; + << ", recovery build data done, rows: " << rows; }; auto exception_catch_func = [read_func, query_id, this]() { @@ -664,7 +665,9 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", internal build operator finished, node id: " << node_id() << ", task id: " << state->task_id() - << ", partition: " << local_state._partition_cursor; + << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() + << ", usage: " + << _inner_sink_operator->get_memory_usage(local_state._runtime_state.get()); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 8d565bda4dc..e6a14aaf603 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -21,6 +21,7 @@ #include <algorithm> +#include "common/logging.h" #include "pipeline/exec/operator.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" @@ -543,6 +544,10 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B }); RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); + VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id() << ", nonspill build usage: " + << _inner_sink_operator->get_memory_usage( + local_state._shared_state->inner_runtime_state.get()); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 71dc601f014..70a3c0d01af 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1412,7 +1412,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo auto tnode_ = tnode; /// TODO: support rf in partitioned hash join tnode_.runtime_filters.clear(); - const uint32_t partition_count = 32; + const uint32_t partition_count = 128; auto inner_probe_operator = std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs); auto inner_sink_operator = std::make_shared<HashJoinBuildSinkOperatorX>( diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5a987cba416..e591f58a163 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -250,6 +250,18 @@ bool PipelineTask::_wait_to_start() { } bool PipelineTask::_is_blocked() { + _blocked_dep = _spill_dependency->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return true; + } + + _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return true; + } + // `_dry_run = true` means we do not need data from source operator. if (!_dry_run) { for (int i = _read_dependencies.size() - 1; i >= 0; i--) { @@ -263,22 +275,15 @@ bool PipelineTask::_is_blocked() { } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. if (!_operators[i]->need_more_input_data(_state)) { - if (VLOG_DEBUG_IS_ON) { - VLOG_DEBUG << "query: " << print_id(_state->query_id()) - << ", task id: " << _index << ", operator " << i - << " not need_more_input_data"; - } + // if (VLOG_DEBUG_IS_ON) { + // VLOG_DEBUG << "query: " << print_id(_state->query_id()) + // << ", task id: " << _index << ", operator " << i + // << " not need_more_input_data"; + // } break; } } } - - _blocked_dep = _spill_dependency->is_blocked_by(this); - if (_blocked_dep != nullptr) { - _blocked_dep->start_watcher(); - return true; - } - for (auto* op_dep : _write_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { @@ -286,12 +291,6 @@ bool PipelineTask::_is_blocked() { return true; } } - - _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this); - if (_blocked_dep != nullptr) { - _blocked_dep->start_watcher(); - return true; - } return false; } @@ -390,10 +389,20 @@ Status PipelineTask::execute(bool* eos) { if (reserve_size > 0) { auto st = thread_context()->try_reserve_memory(reserve_size); if (!st.ok()) { - LOG(INFO) << "query: " << print_id(query_id) - << ", try to reserve: " << reserve_size - << " failed: " << st.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + VLOG_DEBUG << "query: " << print_id(query_id) + << ", try to reserve: " << reserve_size << "(sink reserve size:(" + << sink_reserve_size << " )" + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() + << ", is_wg_mem_high_water_mark: " << is_wg_mem_high_water_mark + << " failed: " << st.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + { + _memory_sufficient_dependency->block(); + _state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this); + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + continue; + } has_enough_memory = false; } } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index f76affffb70..5752256ce57 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -203,7 +203,8 @@ void TaskScheduler::add_paused_task(PipelineTask* task) { auto wg = query_ctx_sptr->workload_group(); auto&& [it, inserted] = _paused_queries_list[wg].emplace(std::move(query_ctx_sptr)); if (inserted) { - LOG(INFO) << "here insert one new paused query: " << print_id(it->get()->query_id()); + LOG(INFO) << "here insert one new paused query: " << it->query_id() + << ", wg: " << (void*)(wg.get()); } _paused_queries_cv.notify_all(); @@ -249,8 +250,8 @@ void TaskScheduler::_paused_queries_handler() { if (!is_low_wartermark && !is_high_wartermark) { LOG(INFO) << "**** there are " << queries_list.size() << " to resume"; for (const auto& query : queries_list) { - LOG(INFO) << "**** resume paused query: " << print_id(query->query_id()); - query->set_memory_sufficient(true); + LOG(INFO) << "**** resume paused query: " << query.query_id(); + query.query_ctx->set_memory_sufficient(true); } queries_list.clear(); @@ -269,7 +270,7 @@ void TaskScheduler::_paused_queries_handler() { auto it_to_remove = queries_list.end(); for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { - const auto& query_ctx = *query_it; + const auto& query_ctx = query_it->query_ctx; size_t revocable_size = 0; size_t memory_usage = 0; bool has_running_task = false; @@ -324,24 +325,27 @@ void TaskScheduler::_paused_queries_handler() { } else if (max_memory_usage_query) { bool new_is_low_wartermark = false; bool new_is_high_wartermark = false; + const auto query_id = print_id(max_memory_usage_query->query_id()); wg->check_mem_used(&new_is_low_wartermark, &new_is_high_wartermark); - if (new_is_high_wartermark) { - LOG(INFO) << "memory insufficient and cannot find revocable query, cancel " - "the query: " - << print_id(max_memory_usage_query->query_id()) - << ", usage: " << max_memory_usage + if (!new_is_low_wartermark || it_to_remove->elapsed_time() < 2000) { + LOG(INFO) << "memory insufficient and cannot find revocable query, " + "the max usage query: " + << query_id << ", usage: " << max_memory_usage + << ", elapsed: " << it_to_remove->elapsed_time() << ", wg info: " << wg->debug_string(); - max_memory_usage_query->cancel(Status::InternalError( - "memory insufficient and cannot find revocable query, cancel the " - "biggest usage({}) query({})", - max_memory_usage, print_id(max_memory_usage_query->query_id()))); - } else { - LOG(INFO) << "new_is_high_wartermark is false, resume max memory usage " - "paused query: " - << print_id(max_memory_usage_query->query_id()); - max_memory_usage_query->set_memory_sufficient(true); - queries_list.erase(it_to_remove); + continue; } + + LOG(INFO) << "memory insufficient and cannot find revocable query, " + "cancel " + "the query: " + << query_id << ", usage: " << max_memory_usage + << ", wg info: " << wg->debug_string(); + max_memory_usage_query->cancel(Status::InternalError( + "memory insufficient and cannot find revocable query, cancel " + "the " + "biggest usage({}) query({})", + max_memory_usage, query_id)); } } } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 987dcd81bd0..bed38887037 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -18,6 +18,7 @@ #pragma once #include <atomic> +#include <chrono> #include <condition_variable> #include <cstddef> #include <list> @@ -32,6 +33,7 @@ #include "runtime/query_context.h" #include "runtime/workload_group/workload_group.h" #include "util/thread.h" +#include "util/uid_util.h" namespace doris { class ExecEnv; @@ -44,6 +46,31 @@ class TaskQueue; namespace doris::pipeline { +struct PausedQuery { + std::shared_ptr<QueryContext> query_ctx; + std::chrono::system_clock::time_point enqueue_at; + size_t last_mem_usage {0}; + + PausedQuery(std::shared_ptr<QueryContext> query_ctx_) + : query_ctx(std::move(query_ctx_)), _query_id(print_id(query_ctx->query_id())) { + enqueue_at = std::chrono::system_clock::now(); + } + + int64_t elapsed_time() const { + auto now = std::chrono::system_clock::now(); + return std::chrono::duration_cast<std::chrono::milliseconds>(now - enqueue_at).count(); + } + + std::string query_id() const { return _query_id; } + + bool operator<(const PausedQuery& other) const { return _query_id < other._query_id; } + + bool operator==(const PausedQuery& other) const { return _query_id == other._query_id; } + +private: + std::string _query_id; +}; + class TaskScheduler { public: TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, std::string name, @@ -72,7 +99,7 @@ private: std::string _name; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; - std::map<WorkloadGroupPtr, std::set<std::shared_ptr<QueryContext>>> _paused_queries_list; + std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list; std::mutex _paused_queries_lock; std::condition_variable _paused_queries_cv; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org