This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 2ae4c86c81e [fix](spill) Thread conflicts caused by non-sink operators spilling (#45796) 2ae4c86c81e is described below commit 2ae4c86c81eef96b3c035f9fd076921b563b87f1 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Mon Dec 23 17:40:09 2024 +0800 [fix](spill) Thread conflicts caused by non-sink operators spilling (#45796) --- be/src/pipeline/exec/operator.h | 8 ---- .../exec/partitioned_hash_join_probe_operator.cpp | 55 +++++++--------------- .../exec/partitioned_hash_join_probe_operator.h | 6 +-- be/src/pipeline/exec/spill_utils.h | 39 +++------------ be/src/pipeline/pipeline_task.cpp | 4 +- 5 files changed, 25 insertions(+), 87 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index af13ded196e..3267416e4b2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -857,14 +857,6 @@ public: return (_child and !is_source()) ? _child->revocable_mem_size(state) : 0; } - Status revoke_memory(RuntimeState* state, - const std::shared_ptr<SpillContext>& spill_context) override { - if (_child and !is_source()) { - return _child->revoke_memory(state, spill_context); - } - return Status::OK(); - } - // If this method is not overwrite by child, its default value is 1MB [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { return state->minimum_operator_memory_required_bytes(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f6cea157cd5..f5568ba1022 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -172,8 +172,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { return Status::OK(); } -Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( - RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { +Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto query_id = state->query_id(); @@ -208,7 +207,9 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( std::numeric_limits<size_t>::max(), _runtime_profile.get())); } - auto merged_block = vectorized::MutableBlock::create_unique(blocks[0].clone_empty()); + auto merged_block = vectorized::MutableBlock::create_unique(std::move(blocks.back())); + blocks.pop_back(); + while (!blocks.empty() && !state->is_cancelled()) { auto block = std::move(blocks.back()); blocks.pop_back(); @@ -218,17 +219,9 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_probe spill_probe_blocks failed"); }); - - if (merged_block->allocated_bytes() >= - vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { - COUNTER_UPDATE(_spill_probe_rows, merged_block->rows()); - RETURN_IF_ERROR( - spilling_stream->spill_block(state, merged_block->to_block(), false)); - COUNTER_UPDATE(_spill_probe_blocks, 1); - } } - if (!merged_block->empty()) { + if (!merged_block->empty()) [[likely]] { COUNTER_UPDATE(_spill_probe_rows, merged_block->rows()); RETURN_IF_ERROR( spilling_stream->spill_block(state, merged_block->to_block(), false)); @@ -256,9 +249,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( return status; }; - if (spill_context) { - spill_context->on_non_sink_task_started(); - } _spill_dependency->block(); DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func", { return Status::Error<INTERNAL_ERROR>( @@ -266,8 +256,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( }); auto spill_runnable = std::make_shared<SpillNonSinkRunnable>( - state, spill_context, _spill_dependency, _runtime_profile.get(), - _shared_state->shared_from_this(), exception_catch_func); + state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), + exception_catch_func); return spill_io_pool->submit(std::move(spill_runnable)); } @@ -856,27 +846,6 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta return size_to_reserve; } -Status PartitionedHashJoinProbeOperatorX::revoke_memory( - RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { - auto& local_state = get_local_state(state); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() - << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; - - if (local_state._child_eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() - << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos - << ", will not revoke size: " << revocable_mem_size(state); - return Status::OK(); - } - - RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context)); - - if (_child) { - return _child->revoke_memory(state, spill_context); - } - return Status::OK(); -} - Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() @@ -891,7 +860,15 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat if (local_state._shared_state->need_to_spill) { const auto revocable_size = _revocable_mem_size(state); const auto min_revocable_size = state->min_revocable_mem(); - return revocable_size > min_revocable_size; + + if (state->get_query_ctx()->low_memory_mode()) { + return revocable_size > + std::min<int64_t>(min_revocable_size, + static_cast<int64_t>( + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM)); + } else { + return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; + } } return false; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 7b77e1e6e3f..40c6b0fcef5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -47,8 +47,7 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status spill_probe_blocks(RuntimeState* state, - const std::shared_ptr<SpillContext>& spill_context = nullptr); + Status spill_probe_blocks(RuntimeState* state); Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data); @@ -181,9 +180,6 @@ public: return _inner_probe_operator->require_data_distribution(); } - Status revoke_memory(RuntimeState* state, - const std::shared_ptr<SpillContext>& spill_context) override; - private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index 84a3f8c2e29..c0bb47960c5 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -50,36 +50,18 @@ struct SpillContext { ~SpillContext() { if (running_tasks_count.load() != 0) { - LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id) - << " not all spill tasks finished, remaining tasks: " - << running_tasks_count.load(); - } - - if (_running_non_sink_tasks_count.load() != 0) { - LOG_EVERY_T(WARNING, 60) - << "Query: " << print_id(query_id) - << " not all spill tasks(non sink tasks) finished, remaining tasks: " - << _running_non_sink_tasks_count.load(); + LOG(WARNING) << "Query: " << print_id(query_id) + << " not all spill tasks finished, remaining tasks: " + << running_tasks_count.load(); } } void on_task_finished() { auto count = running_tasks_count.fetch_sub(1); - if (count == 1 && _running_non_sink_tasks_count.load() == 0) { - all_tasks_finished_callback(this); - } - } - - void on_non_sink_task_started() { _running_non_sink_tasks_count.fetch_add(1); } - void on_non_sink_task_finished() { - const auto count = _running_non_sink_tasks_count.fetch_sub(1); - if (count == 1 && running_tasks_count.load() == 0) { + if (count == 1) { all_tasks_finished_callback(this); } } - -private: - std::atomic_int _running_non_sink_tasks_count {0}; }; class SpillRunnable : public Runnable { @@ -233,20 +215,13 @@ public: class SpillNonSinkRunnable : public SpillRunnable { public: - SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context, - std::shared_ptr<Dependency> spill_dependency, RuntimeProfile* profile, + SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<Dependency> spill_dependency, + RuntimeProfile* profile, const std::shared_ptr<BasicSpillSharedState>& shared_state, std::function<Status()> spill_exec_func, std::function<Status()> spill_fin_cb = {}) - : SpillRunnable(state, spill_context, spill_dependency, profile, shared_state, true, + : SpillRunnable(state, nullptr, spill_dependency, profile, shared_state, true, spill_exec_func, spill_fin_cb) {} - -protected: - void _on_task_finished() override { - if (_spill_context) { - _spill_context->on_non_sink_task_finished(); - } - } }; class SpillRecoverRunnable : public SpillRunnable { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9d284b31861..0ddc329da3b 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -619,7 +619,7 @@ size_t PipelineTask::get_revocable_size() const { return 0; } - return _sink->revocable_mem_size(_state) + _root->revocable_mem_size(_state); + return _sink->revocable_mem_size(_state); } Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) { @@ -632,8 +632,6 @@ Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_co return Status::OK(); } - RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context)); - const auto revocable_size = _sink->revocable_mem_size(_state); if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org