This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5759d685f6761ffb1a77749dac6b5970e08a726c Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Sep 6 11:30:07 2024 +0800 [opt](spill) Avoid occupying a large amount of memory in join build side --- be/src/pipeline/exec/operator.h | 19 +++++--- .../exec/partitioned_aggregation_sink_operator.cpp | 6 +++ .../partitioned_aggregation_source_operator.cpp | 7 ++- .../exec/partitioned_aggregation_source_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 53 ++++++++++++++++++---- .../exec/partitioned_hash_join_probe_operator.h | 9 +++- .../exec/partitioned_hash_join_sink_operator.cpp | 26 ++++++----- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 5 ++ .../pipeline/exec/spill_sort_source_operator.cpp | 9 ++-- be/src/pipeline/exec/spill_sort_source_operator.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 25 ++++++---- be/src/pipeline/pipeline_task.h | 21 +++++++-- be/src/pipeline/task_scheduler.cpp | 38 +++++++++------- be/src/runtime/runtime_state.h | 6 --- be/src/vec/spill/spill_stream.h | 3 +- 16 files changed, 161 insertions(+), 72 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 14cd56f5751..d111a2a7e24 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -417,11 +417,7 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status open(RuntimeState* state) override { - _spill_dependency = state->get_spill_dependency(); - DCHECK(_spill_dependency != nullptr); - return Status::OK(); - } + Status open(RuntimeState* state) override { return Status::OK(); } Status close(RuntimeState* state, Status exec_status) override; @@ -449,7 +445,7 @@ public: protected: Dependency* _dependency = nullptr; - Dependency* _spill_dependency = nullptr; + std::shared_ptr<Dependency> _spill_dependency; SharedStateType* _shared_state = nullptr; private: @@ -734,6 +730,17 @@ public: } } + size_t revocable_mem_size(RuntimeState* state) const override { + return (_child_x and !is_source()) ? _child_x->revocable_mem_size(state) : 0; + } + + Status revoke_memory(RuntimeState* state) override { + if (_child_x and !is_source()) { + return _child_x->revoke_memory(state); + } + return Status::OK(); + } + virtual std::string debug_string(int indentation_level = 0) const; virtual std::string debug_string(RuntimeState* state, int indentation_level = 0) const; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 314806529b7..5c26cfb6b97 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -22,7 +22,9 @@ #include "aggregation_sink_operator.h" #include "common/status.h" +#include "pipeline/dependency.h" #include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "vec/spill/spill_stream_manager.h" @@ -58,6 +60,10 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column()); } + _spill_dependency = Dependency::create_shared(parent.operator_id(), parent.node_id(), + "AggSinkSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + _finish_dependency->block(); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index bdbd395ee99..fa41723beba 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -24,6 +24,7 @@ #include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" #include "vec/spill/spill_stream_manager.h" @@ -38,6 +39,10 @@ Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _init_counters(); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "AggSourceSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + return Status::OK(); } @@ -48,8 +53,6 @@ Status PartitionedAggLocalState::open(RuntimeState* state) { return Status::OK(); } _opened = true; - _spill_dependency = state->get_spill_dependency(); - DCHECK(_spill_dependency != nullptr); RETURN_IF_ERROR(setup_in_memory_agg_op(state)); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index c09046d840a..3505cf7eed8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -59,7 +59,7 @@ protected: bool _current_partition_eos = true; bool _is_merging = false; - Dependency* _spill_dependency {nullptr}; + std::shared_ptr<Dependency> _spill_dependency; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; RuntimeProfile::Counter* _get_results_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index d91b424440a..51b6e143b3c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -41,6 +41,10 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _partitioned_blocks.resize(p._partition_count); _probe_spilling_streams.resize(p._partition_count); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "HashJoinProbeSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition"); _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "Partition"); _partition_shuffle_timer = ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "Partition"); @@ -144,8 +148,6 @@ void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(PipelineXSpillLocalState::open(state)); - _spill_dependency = state->get_spill_dependency(); - DCHECK(_spill_dependency != nullptr); return _parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state, _partitioner); } @@ -160,13 +162,16 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { return Status::OK(); } -Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state) { +Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state, bool force) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto query_id = state->query_id(); + const auto spill_size_threshold = force ? vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + : vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; + MonotonicStopWatch submit_timer; submit_timer.start(); - auto spill_func = [query_id, state, submit_timer, this] { + auto spill_func = [query_id, state, submit_timer, spill_size_threshold, this] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_probe_timer); @@ -175,8 +180,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat ++partition_index) { auto& blocks = _probe_blocks[partition_index]; auto& partitioned_block = _partitioned_blocks[partition_index]; - if (partitioned_block && partitioned_block->allocated_bytes() >= - vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + if (partitioned_block && partitioned_block->allocated_bytes() >= spill_size_threshold) { blocks.emplace_back(partitioned_block->to_block()); partitioned_block.reset(); } @@ -756,6 +760,22 @@ bool PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); + if (local_state._child_eos) { + return 0; + } + + auto revocable_size = _revocable_mem_size(state, true); + if (_child_x) { + revocable_size += _child_x->revocable_mem_size(state); + } + return revocable_size; +} + +size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* state, + bool force) const { + const auto spill_size_threshold = force ? vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + : vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; + auto& local_state = get_local_state(state); size_t mem_size = 0; auto& probe_blocks = local_state._probe_blocks; for (uint32_t i = 0; i < _partition_count; ++i) { @@ -766,7 +786,7 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state auto& partitioned_block = local_state._partitioned_blocks[i]; if (partitioned_block) { auto block_bytes = partitioned_block->allocated_bytes(); - if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + if (block_bytes >= spill_size_threshold) { mem_size += block_bytes; } } @@ -774,6 +794,23 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state return mem_size; } +Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) { + auto& local_state = get_local_state(state); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; + + if (local_state._child_eos) { + return Status::OK(); + } + + RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true)); + + if (_child_x) { + return _child_x->revoke_memory(state); + } + return Status::OK(); +} + Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() @@ -786,7 +823,7 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const { auto& local_state = get_local_state(state); if (local_state._shared_state->need_to_spill) { - const auto revocable_size = revocable_mem_size(state); + const auto revocable_size = _revocable_mem_size(state); const auto min_revocable_size = state->min_revocable_mem(); return revocable_size > min_revocable_size; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index f1b635208eb..7a4ba1ed50b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -18,6 +18,7 @@ #pragma once #include <cstdint> +#include <memory> #include "common/status.h" #include "operator.h" @@ -46,7 +47,7 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status spill_probe_blocks(RuntimeState* state); + Status spill_probe_blocks(RuntimeState* state, bool force = false); Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data); @@ -89,7 +90,7 @@ private: bool _need_to_setup_internal_operators {true}; - Dependency* _spill_dependency {nullptr}; + std::shared_ptr<Dependency> _spill_dependency; RuntimeProfile::Counter* _spill_and_partition_label = nullptr; RuntimeProfile::Counter* _partition_timer = nullptr; @@ -186,9 +187,13 @@ public: return _inner_probe_operator->require_data_distribution(); } + Status revoke_memory(RuntimeState* state) override; + private: Status _revoke_memory(RuntimeState* state); + size_t _revocable_mem_size(RuntimeState* state, bool force = false) const; + friend class PartitionedHashJoinProbeLocalState; [[nodiscard]] Status _setup_internal_operators(PartitionedHashJoinProbeLocalState& local_state, diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index e6a14aaf603..a073d922769 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -23,6 +23,7 @@ #include "common/logging.h" #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" #include "util/runtime_profile.h" @@ -39,6 +40,10 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _shared_state->partitioned_build_blocks.resize(p._partition_count); _shared_state->spilled_streams.resize(p._partition_count); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "HashJoinBuildSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", "Spill", 1); @@ -548,6 +553,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B << ", task id: " << state->task_id() << ", nonspill build usage: " << _inner_sink_operator->get_memory_usage( local_state._shared_state->inner_runtime_state.get()); + } else { + return revoke_memory(state); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), @@ -565,6 +572,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (need_to_spill) { RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); + if (eos) { + return revoke_memory(state); + } } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator(state)); @@ -576,18 +586,12 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B }); RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); - } - if (eos) { - LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; - std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), - local_state._shared_state->partitioned_build_blocks.end(), [&](auto& block) { - if (block) { - COUNTER_UPDATE(local_state._in_mem_rows_counter, block->rows()); - } - }); - local_state._dependency->set_ready_to_read(); + if (eos) { + LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id(); + local_state._dependency->set_ready_to_read(); + } } return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index c82404afb03..9dcb66240df 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -19,6 +19,7 @@ #include "pipeline/exec/sort_sink_operator.h" #include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "vec/spill/spill_stream_manager.h" @@ -38,6 +39,10 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state, _init_counters(); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "SortSinkSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + RETURN_IF_ERROR(setup_in_memory_sort_op(state)); Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index c3f9f633cd3..601188ae02e 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -21,6 +21,7 @@ #include "common/status.h" #include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "sort_source_operator.h" #include "util/runtime_profile.h" @@ -37,6 +38,11 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "SortSourceSpillDependency", true); + state->get_task()->add_spill_dependency(_spill_dependency.get()); + _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", "Spill", 1); _spill_merge_sort_timer = @@ -61,9 +67,6 @@ Status SpillSortLocalState::open(RuntimeState* state) { return Status::OK(); } - _spill_dependency = state->get_spill_dependency(); - DCHECK(_spill_dependency != nullptr); - RETURN_IF_ERROR(setup_in_memory_sort_op(state)); return Base::open(state); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 5674e18ef69..e372a9039bf 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -60,7 +60,7 @@ protected: std::vector<vectorized::SpillStreamSPtr> _current_merging_streams; std::unique_ptr<vectorized::VSortedRunMerger> _merger; - Dependency* _spill_dependency {nullptr}; + std::shared_ptr<Dependency> _spill_dependency; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // counters for spill merge sort diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d4a300e58a2..5d1b6aaaa1b 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1412,7 +1412,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo auto tnode_ = tnode; /// TODO: support rf in partitioned hash join tnode_.runtime_filters.clear(); - const uint32_t partition_count = 128; + const uint32_t partition_count = 32; auto inner_probe_operator = std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs); auto inner_sink_operator = std::make_shared<HashJoinBuildSinkOperatorX>( diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index c34c40580a9..e27d73be62e 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -72,9 +72,6 @@ PipelineTask::PipelineTask( state->get_query_ctx()->get_memory_sufficient_dependency()) { _pipeline_task_watcher.start(); - _spill_dependency = Dependency::create_shared(-1, -1, "PipelineTaskSpillDependency", true); - - _state->set_spill_dependency(_spill_dependency.get()); auto shared_state = _sink->create_shared_state(); if (shared_state) { _sink_shared_state = shared_state; @@ -250,10 +247,12 @@ bool PipelineTask::_wait_to_start() { } bool PipelineTask::_is_blocked() { - _blocked_dep = _spill_dependency->is_blocked_by(this); - if (_blocked_dep != nullptr) { - _blocked_dep->start_watcher(); - return true; + for (auto* spill_dependency : _spill_dependencies) { + _blocked_dep = spill_dependency->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return true; + } } _blocked_dep = _memory_sufficient_dependency->is_blocked_by(this); @@ -528,11 +527,19 @@ std::string PipelineTask::debug_string() { } size_t PipelineTask::get_revocable_size() const { - return (_running || _eos) ? 0 : _sink->revocable_mem_size(_state); + if (_running || _eos) { + return 0; + } + + auto revocable_size = _root->revocable_mem_size(_state); + revocable_size += _sink->revocable_mem_size(_state); + + return revocable_size; } Status PipelineTask::revoke_memory() { - return _sink->revoke_memory(_state); + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + return _root->revoke_memory(_state); } void PipelineTask::wake_up() { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 79da888cda6..943366b4b70 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -141,7 +141,10 @@ public: if (!_finalized) { _execution_dep->set_always_ready(); _memory_sufficient_dependency->set_always_ready(); - _spill_dependency->set_always_ready(); + for (auto* dep : _spill_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _filter_dependencies) { dep->set_always_ready(); } @@ -198,7 +201,14 @@ public: void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } bool is_running() { return _running.load(); } - bool is_revoking() { return _spill_dependency->is_blocked_by(nullptr) != nullptr; } + bool is_revoking() { + for (auto* dep : _spill_dependencies) { + if (dep->is_blocked_by(nullptr) != nullptr) { + return true; + } + } + return false; + } bool set_running(bool running) { return _running.exchange(running); } bool is_exceed_debug_timeout() { @@ -240,6 +250,10 @@ public: [[nodiscard]] size_t get_revocable_size() const; [[nodiscard]] Status revoke_memory(); + void add_spill_dependency(Dependency* dependency) { + _spill_dependencies.emplace_back(dependency); + } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -298,6 +312,7 @@ private: // `_read_dependencies` is stored as same order as `_operators` std::vector<std::vector<Dependency*>> _read_dependencies; + std::vector<Dependency*> _spill_dependencies; std::vector<Dependency*> _write_dependencies; std::vector<Dependency*> _finish_dependencies; std::vector<Dependency*> _filter_dependencies; @@ -317,8 +332,6 @@ private: Dependency* _memory_sufficient_dependency = nullptr; - std::shared_ptr<Dependency> _spill_dependency; - std::atomic<bool> _finalized {false}; std::mutex _dependency_lock; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 0aabeac4933..a4379f73b64 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -331,26 +331,30 @@ void TaskScheduler::_paused_queries_handler() { bool new_is_high_wartermark = false; const auto query_id = print_id(max_memory_usage_query->query_id()); wg->check_mem_used(&new_is_low_wartermark, &new_is_high_wartermark); - if (!new_is_low_wartermark || it_to_remove->elapsed_time() < 2000) { - LOG(INFO) << "memory insufficient and cannot find revocable query, " - "the max usage query: " + if (new_is_high_wartermark) { + if (it_to_remove->elapsed_time() < 2000) { + LOG(INFO) << "memory insufficient and cannot find revocable query, " + "the max usage query: " + << query_id << ", usage: " << max_memory_usage + << ", elapsed: " << it_to_remove->elapsed_time() + << ", wg info: " << wg->debug_string(); + continue; + } + max_memory_usage_query->cancel(Status::InternalError( + "memory insufficient and cannot find revocable query, cancel " + "the " + "biggest usage({}) query({})", + max_memory_usage, query_id)); + queries_list.erase(it_to_remove); + + } else { + LOG(INFO) << "non high water mark, resume " + "the query: " << query_id << ", usage: " << max_memory_usage - << ", elapsed: " << it_to_remove->elapsed_time() << ", wg info: " << wg->debug_string(); - continue; + max_memory_usage_query->set_memory_sufficient(true); + queries_list.erase(it_to_remove); } - - LOG(INFO) << "memory insufficient and cannot find revocable query, " - "cancel " - "the query: " - << query_id << ", usage: " << max_memory_usage - << ", wg info: " << wg->debug_string(); - // Should use memory exceed error code, so that FE may do retry for this error - max_memory_usage_query->cancel(Status::MemoryLimitExceeded( - "memory insufficient and cannot find revocable query, cancel " - "the " - "biggest usage({}) query({})", - max_memory_usage, query_id)); } } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 87c54564ae5..442268850bf 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -603,10 +603,6 @@ public: return _partial_update_auto_inc_column; } - void set_spill_dependency(pipeline::Dependency* dependency) { _spill_dependency = dependency; } - - pipeline::Dependency* get_spill_dependency() { return _spill_dependency; } - private: Status create_error_log_file(); @@ -702,8 +698,6 @@ private: int _task_id = -1; int _task_num = 0; - pipeline::Dependency* _spill_dependency; - std::vector<THivePartitionUpdate> _hive_partition_updates; std::vector<TIcebergCommitData> _iceberg_commit_datas; diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index ad30a0bbd1d..7a4bb4980b1 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -35,7 +35,8 @@ class SpillDataDir; class SpillStream { public: // to avoid too many small file writes - static constexpr int MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024; + static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024; + static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024; SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir, std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org