This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8fb501b09ef [refactor](spill) unify the entry point of spill tasks (#37020) 8fb501b09ef is described below commit 8fb501b09efd77a299ab3405ff25f7a652f14b7a Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Jul 1 09:52:32 2024 +0800 [refactor](spill) unify the entry point of spill tasks (#37020) --- .../exec/partitioned_aggregation_sink_operator.cpp | 30 ++---- .../partitioned_aggregation_source_operator.cpp | 29 ++---- .../exec/partitioned_aggregation_source_operator.h | 1 - .../exec/partitioned_hash_join_probe_operator.cpp | 76 +++----------- .../exec/partitioned_hash_join_probe_operator.h | 4 +- .../exec/partitioned_hash_join_sink_operator.cpp | 112 +++++++-------------- .../exec/partitioned_hash_join_sink_operator.h | 3 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 30 +----- .../pipeline/exec/spill_sort_source_operator.cpp | 27 +---- be/src/pipeline/exec/spill_sort_source_operator.h | 1 - be/src/pipeline/exec/spill_utils.h | 76 ++++++++++++++ 11 files changed, 152 insertions(+), 237 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index b833289e0e0..4399f3c7045 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -22,6 +22,7 @@ #include "aggregation_sink_operator.h" #include "common/status.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "vec/spill/spill_stream_manager.h" @@ -253,14 +254,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { } }}; - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<PartitionedAggSharedState> shared_state_holder = - _shared_state->shared_from_this(); auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -269,20 +263,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { "fault_inject partitioned_agg_sink revoke_memory submit_func failed"); return status; }); - status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, &parent, state, query_id, mem_tracker, shared_state_holder, execution_context, - submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return Status::Cancelled("Cancelled"); - } + + auto spill_runnable = std::make_shared<SpillRunnable>( + state, _shared_state->shared_from_this(), + [this, &parent, state, query_id, submit_timer] { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_sink " @@ -332,7 +316,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { parent._agg_sink_operator->reset_hash_table(runtime_state); return Base::_shared_state->sink_status; }); - return status; + + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( + std::move(spill_runnable)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index fd609d95eef..a8c4e7b0bcc 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -23,6 +23,7 @@ #include "common/exception.h" #include "common/status.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" #include "vec/spill/spill_stream_manager.h" @@ -204,18 +205,11 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); _dependency->Dependency::block(); - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<PartitionedAggSharedState> shared_state_holder = - _shared_state->shared_from_this(); auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); - auto spill_func = [this, state, query_id, execution_context, submit_timer] { + auto spill_func = [this, state, query_id, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { @@ -276,19 +270,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime return _status; }; - auto exception_catch_func = [spill_func, query_id, mem_tracker, shared_state_holder, - execution_context, this]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } + auto exception_catch_func = [spill_func, query_id, this]() { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_source " @@ -308,7 +290,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_agg_source submit_func failed"); }); - return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - exception_catch_func); + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( + std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index a847b7fcf88..994290a15bb 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -94,7 +94,6 @@ public: private: friend class PartitionedAggLocalState; - Status _initiate_merge_spill_partition_agg_data(RuntimeState* state); std::unique_ptr<AggSourceOperatorX> _agg_source_operator; }; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 261979e3261..1ff927bcc6d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -158,15 +158,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = - _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -216,19 +208,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [query_id, mem_tracker, shared_state_holder, execution_context, - spill_func, this]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } + auto exception_catch_func = [query_id, spill_func, this]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -250,7 +230,10 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat return Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed"); }); - return spill_io_pool->submit_func(exception_catch_func); + + auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func); + return spill_io_pool->submit(std::move(spill_runnable)); } Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_index) { @@ -288,15 +271,10 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti return Status::OK(); } - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = _shared_state->shared_from_this(); auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -362,19 +340,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _dependency->set_ready(); }; - auto exception_catch_func = [read_func, query_id, mem_tracker, shared_state_holder, - execution_context, state, this]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } + auto exception_catch_func = [read_func, query_id, this]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -403,7 +369,9 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti "fault_inject partitioned_hash_join_probe " "recovery_build_blocks submit_func failed"); }); - return spill_io_pool->submit_func(exception_catch_func); + auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func); + return spill_io_pool->submit(std::move(spill_runnable)); } std::string PartitionedHashJoinProbeLocalState::debug_string(int indentation_level) const { @@ -426,14 +394,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti auto& blocks = _probe_blocks[partition_index]; - /// TODO: maybe recovery more blocks each time. - auto execution_context = state->get_task_execution_context(); - std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = - _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - MonotonicStopWatch submit_timer; submit_timer.start(); @@ -470,19 +431,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti _dependency->set_ready(); }; - auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, execution_context, - query_id, this]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } + auto exception_catch_func = [read_func, query_id, this]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -511,7 +460,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti "fault_inject partitioned_hash_join_probe " "recovery_probe_blocks submit_func failed"); }); - return spill_io_pool->submit_func(exception_catch_func); + return spill_io_pool->submit(std::make_shared<SpillRunnable>( + state, _shared_state->shared_from_this(), exception_catch_func)); } PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* pool, @@ -538,7 +488,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) { _probe_exprs.emplace_back(conjunct.left); } - _partitioner = std::make_unique<PartitionerType>(_partition_count); + _partitioner = std::make_unique<SpillPartitionerType>(_partition_count); RETURN_IF_ERROR(_partitioner->init(_probe_exprs)); return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 1fbed85a123..6ee718a3354 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -24,15 +24,13 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" -#include "vec/runtime/partitioner.h" +#include "pipeline/exec/spill_utils.h" namespace doris { class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; - class PartitionedHashJoinProbeOperatorX; class PartitionedHashJoinProbeLocalState final diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index cb104cfc7cd..4aa0bd42a84 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -118,16 +118,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta return Status::OK(); } - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = - _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - auto spill_func = [state, num_slots, - this](std::vector<vectorized::Block>& build_blocks) mutable { + auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count); @@ -211,28 +202,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); }; - auto exception_catch_func = [build_blocks = std::move(build_blocks), spill_func, - shared_state_holder, execution_context, state, query_id, - mem_tracker, this]() mutable { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - Defer defer {[&]() { - // need to reset build_block here, or else build_block will be destructed - // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure - build_blocks.clear(); - }}; - - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { - LOG(INFO) << "execution_context released, maybe query was canceled."; - return; - } - + auto exception_catch_func = [spill_func, this]() mutable { auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(spill_func(build_blocks)); + RETURN_IF_CATCH_EXCEPTION(spill_func()); return Status::OK(); }(); @@ -243,6 +215,10 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); } }; + + auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func); + auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); _dependency->block(); @@ -252,7 +228,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta "fault_inject partitioned_hash_join_sink " "revoke_unpartitioned_block submit_func failed"); }); - return thread_pool->submit_func(exception_catch_func); + return thread_pool->submit(std::move(spill_runnable)); } Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { @@ -268,15 +244,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = - _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; @@ -302,41 +270,35 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { st = Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_sink revoke_memory submit_func failed"); }); - if (st.ok()) { - st = spill_io_pool->submit_func([this, query_id, mem_tracker, shared_state_holder, - execution_context, spilling_stream, i, submit_timer] { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return; - } - DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, - Status::InternalError("fault_inject partitioned_hash_join_sink " - "revoke_memory canceled")); - return; + + auto spill_runnable = std::make_shared<SpillRunnable>( + state, _shared_state->shared_from_this(), + [this, query_id, spilling_stream, i, submit_timer] { + DBUG_EXECUTE_IF( + "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + query_id, Status::InternalError( + "fault_inject partitioned_hash_join_sink " + "revoke_memory canceled")); + return; + }); + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_build_timer); + + auto status = [&]() { + RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream)); + return Status::OK(); + }(); + + if (!status.OK()) { + std::unique_lock<std::mutex> lock(_spill_lock); + _dependency->set_ready(); + _spill_status_ok = false; + _spill_status = std::move(status); + } }); - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_build_timer); - - auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream)); - return Status::OK(); - }(); - - if (!status.OK()) { - std::unique_lock<std::mutex> lock(_spill_lock); - _dependency->set_ready(); - _spill_status_ok = false; - _spill_status = std::move(status); - } - }); + if (st.ok()) { + st = spill_io_pool->submit(std::move(spill_runnable)); } if (!st.ok()) { @@ -452,7 +414,7 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta _build_exprs.emplace_back(eq_join_conjunct.right); partition_exprs.emplace_back(eq_join_conjunct.right); } - _partitioner = std::make_unique<PartitionerType>(_partition_count); + _partitioner = std::make_unique<SpillPartitionerType>(_partition_count); RETURN_IF_ERROR(_partitioner->init(_build_exprs)); return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 5cd3ff21187..1592c29cdb0 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -24,6 +24,7 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" +#include "pipeline/exec/spill_utils.h" #include "vec/runtime/partitioner.h" namespace doris { @@ -31,8 +32,6 @@ class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; - class PartitionedHashJoinSinkOperatorX; class PartitionedHashJoinSinkLocalState diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index d15b936b4c6..b7fae82ca54 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -18,6 +18,7 @@ #include "spill_sort_sink_operator.h" #include "pipeline/exec/sort_sink_operator.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "vec/spill/spill_stream_manager.h" @@ -229,16 +230,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { if (!_eos) { Base::_dependency->Dependency::block(); } - - auto execution_context = state->get_task_execution_context(); - - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<SpillSortSharedState> shared_state_holder = _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -300,19 +292,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { return Status::OK(); }; - auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder, - execution_context, spill_func]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } + auto exception_catch_func = [this, query_id, spill_func]() { DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject spill_sort_sink " @@ -331,9 +311,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { "revoke_memory submit_func failed"); }); if (status.ok()) { - status = - ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - exception_catch_func); + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( + std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func)); } if (!status.ok()) { if (!_eos) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 7d62a18461b..b322f33caa2 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -18,6 +18,7 @@ #include "spill_sort_source_operator.h" #include "common/status.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "sort_source_operator.h" #include "util/runtime_profile.h" @@ -80,13 +81,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat << " merge spill data"; _dependency->Dependency::block(); - auto execution_context = state->get_task_execution_context(); - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::weak_ptr<SpillSortSharedState> shared_state_holder = _shared_state->shared_from_this(); auto query_id = state->query_id(); - auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -180,20 +175,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder, - execution_context, spill_func]() { - SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); - std::shared_ptr<TaskExecutionContext> execution_context_lock; - auto shared_state_sptr = shared_state_holder.lock(); - if (shared_state_sptr) { - execution_context_lock = execution_context.lock(); - } - if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } - + auto exception_catch_func = [this, spill_func]() { _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); }; @@ -202,8 +184,9 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat "fault_inject spill_sort_source " "merge_sort_spill_data submit_func failed"); }); - return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - exception_catch_func); + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( + std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + exception_catch_func)); } Status SpillSortLocalState::_create_intermediate_merger( diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index ffe8e8a6898..09367415d91 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -90,7 +90,6 @@ public: private: friend class SpillSortLocalState; - Status _initiate_merge_spill_partition_agg_data(RuntimeState* state); std::unique_ptr<SortSourceOperatorX> _sort_source_operator; }; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h new file mode 100644 index 00000000000..f2f19512cbd --- /dev/null +++ b/be/src/pipeline/exec/spill_utils.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/query_context.h" +#include "runtime/runtime_state.h" +#include "runtime/task_execution_context.h" +#include "runtime/thread_context.h" +#include "util/threadpool.h" +#include "vec/runtime/partitioner.h" + +namespace doris::pipeline { +using SpillPartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; + +class SpillRunnable : public Runnable { +public: + SpillRunnable(RuntimeState* state, const std::shared_ptr<BasicSharedState>& shared_state, + std::function<void()> func) + : _state(state), + _mem_tracker(state->get_query_ctx()->query_mem_tracker), + _task_id(state->query_id()), + _task_context_holder(state->get_task_execution_context()), + _shared_state_holder(shared_state), + _func(std::move(func)) {} + + ~SpillRunnable() override = default; + + void run() override { + SCOPED_ATTACH_TASK_WITH_ID(_mem_tracker, _task_id); + Defer defer([&] { + std::function<void()> tmp; + std::swap(tmp, _func); + }); + + auto task_context_holder = _task_context_holder.lock(); + if (!task_context_holder) { + return; + } + + auto shared_state_holder = _shared_state_holder.lock(); + if (!shared_state_holder) { + return; + } + + if (_state->is_cancelled()) { + return; + } + _func(); + } + +private: + RuntimeState* _state; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; + TUniqueId _task_id; + std::weak_ptr<TaskExecutionContext> _task_context_holder; + std::weak_ptr<BasicSharedState> _shared_state_holder; + std::function<void()> _func; +}; + +} // namespace doris::pipeline \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org