This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit f368c9d023e1b952d38a13e4c31c9506f39015da Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Thu Oct 17 14:08:23 2024 +0800 refactor SpillRunnable (#41991) Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/dependency.cpp | 1 - be/src/pipeline/dependency.h | 8 +- be/src/pipeline/exec/operator.h | 16 +- .../exec/partitioned_aggregation_sink_operator.cpp | 62 ++++--- .../partitioned_aggregation_source_operator.cpp | 70 ++++---- .../exec/partitioned_aggregation_source_operator.h | 1 - .../exec/partitioned_hash_join_probe_operator.cpp | 168 +++++++------------ .../exec/partitioned_hash_join_probe_operator.h | 6 - .../exec/partitioned_hash_join_sink_operator.cpp | 179 +++++++-------------- .../exec/partitioned_hash_join_sink_operator.h | 13 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 78 ++++----- .../pipeline/exec/spill_sort_source_operator.cpp | 81 +++++----- be/src/pipeline/exec/spill_sort_source_operator.h | 1 - be/src/pipeline/exec/spill_utils.h | 129 ++++++++++++--- be/src/vec/spill/spill_stream.cpp | 1 + be/src/vec/spill/spill_stream.h | 4 +- be/src/vec/spill/spill_writer.cpp | 11 +- be/src/vec/spill/spill_writer.h | 6 +- 18 files changed, 384 insertions(+), 451 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index d95c74615c1..9ad0ff1b57f 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -343,7 +343,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id, std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), profile)); spill_streams_.emplace_back(spilling_stream_); spill_stream = spilling_stream_; - spill_stream->set_write_counters(profile); return Status::OK(); } void AggSpillPartition::close() { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index f0e14329698..b9cea1cc678 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -426,16 +426,18 @@ private: struct BasicSpillSharedState { virtual ~BasicSpillSharedState() = default; + AtomicStatus _spill_status; + // These two counters are shared to spill source operators as the initial value // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'. // Total bytes of spill data written to disk file(after serialized) - RuntimeProfile::Counter* _spill_write_file_data_size = nullptr; + RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; RuntimeProfile::Counter* _spill_file_total_count = nullptr; void setup_shared_profile(RuntimeProfile* sink_profile) { _spill_file_total_count = ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1); - _spill_write_file_data_size = + _spill_write_file_total_size = ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalSize", TUnit::BYTES, 1); } @@ -463,7 +465,6 @@ struct PartitionedAggSharedState : public BasicSharedState, size_t partition_count_bits; size_t partition_count; size_t max_partition_index; - Status sink_status; bool is_spilled = false; std::atomic_bool is_closed = false; std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; @@ -544,7 +545,6 @@ struct SpillSortSharedState : public BasicSharedState, bool enable_spill = false; bool is_spilled = false; std::atomic_bool is_closed = false; - Status sink_status; std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; std::deque<vectorized::SpillStreamSPtr> sorted_streams; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index f70051f091c..f1d1716aeb6 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -319,7 +319,7 @@ public: ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); _spill_write_block_data_size = ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1); - _spill_write_file_data_size = + _spill_write_file_total_size = ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1); _spill_write_rows_count = ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1); @@ -366,14 +366,16 @@ public: if (_copy_shared_spill_profile) { _copy_shared_spill_profile = false; const auto* spill_shared_state = (const BasicSpillSharedState*)Base::_shared_state; - COUNTER_SET(_spill_file_current_size, - spill_shared_state->_spill_write_file_data_size->value()); - COUNTER_SET(_spill_file_current_count, - spill_shared_state->_spill_file_total_count->value()); + COUNTER_UPDATE(_spill_file_current_size, + spill_shared_state->_spill_write_file_total_size->value()); + COUNTER_UPDATE(_spill_file_current_count, + spill_shared_state->_spill_file_total_count->value()); Base::_shared_state->update_spill_stream_profiles(Base::profile()); } } + std::atomic_int _spilling_task_count {0}; + // Total time of spill, including spill task scheduling time, // serialize block time, write disk file time, // and read disk file time, deserialize block time etc. @@ -397,7 +399,7 @@ public: // Total bytes of spill data in Block format(in memory format) RuntimeProfile::Counter* _spill_write_block_data_size = nullptr; // Total bytes of spill data written to disk file(after serialized) - RuntimeProfile::Counter* _spill_write_file_data_size = nullptr; + RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; RuntimeProfile::Counter* _spill_write_rows_count = nullptr; RuntimeProfile::Counter* _spill_file_total_count = nullptr; RuntimeProfile::Counter* _spill_file_current_count = nullptr; @@ -755,6 +757,8 @@ public: COUNTER_SET(_spill_min_rows_of_partition, min_rows); } + std::atomic_int _spilling_task_count {0}; + std::vector<int64_t> _rows_in_partitions; // Total time of spill, including spill task scheduling time, diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index f5c09459f85..cd4eba2abfe 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -178,7 +178,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status); + if (!local_state._shared_state->_spill_status.ok()) { + return local_state._shared_state->_spill_status.status(); + } local_state._eos = eos; auto* runtime_state = local_state._runtime_state.get(); DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", { @@ -219,7 +221,7 @@ Status PartitionedAggSinkOperatorX::revoke_memory( size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); - if (!local_state.Base::_shared_state->sink_status.ok()) { + if (!local_state._shared_state->_spill_status.ok()) { return UINT64_MAX; } auto* runtime_state = local_state._runtime_state.get(); @@ -270,7 +272,9 @@ Status PartitionedAggSinkLocalState::revoke_memory( << Base::_parent->node_id() << " revoke_memory, size: " << _parent->revocable_mem_size(state) << ", eos: " << _eos; - RETURN_IF_ERROR(Base::_shared_state->sink_status); + if (!_shared_state->_spill_status.ok()) { + return _shared_state->_spill_status.status(); + } if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; profile()->add_info_string("Spilled", "true"); @@ -292,8 +296,6 @@ Status PartitionedAggSinkLocalState::revoke_memory( auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func", { status = Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_agg_sink revoke_memory submit_func failed"); @@ -301,32 +303,28 @@ Status PartitionedAggSinkLocalState::revoke_memory( }); state->get_query_ctx()->increase_revoking_tasks_count(); - auto spill_runnable = std::make_shared<SpillRunnable>( - state, _profile, true, _shared_state->shared_from_this(), - [this, &parent, state, query_id, size_to_revoke, spill_context, submit_timer] { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_write_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_write_timer); + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; + auto spill_runnable = std::make_shared<SpillRunnable>( + state, spill_context, _spilling_task_count, _profile, submit_timer, + _shared_state->shared_from_this(), Base::_spill_dependency, true, true, + [this, &parent, state, query_id, size_to_revoke, spill_context] { + Status status; DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { - auto st = Status::InternalError( + status = Status::InternalError( "fault_inject partitioned_agg_sink " "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st); - return st; + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; }); Defer defer {[&]() { - if (!_shared_state->sink_status.ok() || state->is_cancelled()) { - if (!_shared_state->sink_status.ok()) { - LOG(WARNING) - << "query " << print_id(query_id) << " agg node " - << Base::_parent->node_id() - << " revoke_memory error: " << Base::_shared_state->sink_status; + if (!status.ok() || state->is_cancelled()) { + if (!status.ok()) { + LOG(WARNING) << "query " << print_id(query_id) << " agg node " + << Base::_parent->node_id() + << " revoke_memory error: " << status; } _shared_state->close(); } else { @@ -340,15 +338,10 @@ Status PartitionedAggSinkLocalState::revoke_memory( _finish_dependency->set_ready(); } state->get_query_ctx()->decrease_revoking_tasks_count(); - Base::_spill_dependency->Dependency::set_ready(); - - if (spill_context) { - spill_context->on_task_finished(); - } }}; auto* runtime_state = _runtime_state.get(); auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state); - Base::_shared_state->sink_status = std::visit( + status = std::visit( vectorized::Overload { [&](std::monostate& arg) -> Status { return Status::InternalError("Unit hash table"); @@ -359,10 +352,9 @@ Status PartitionedAggSinkLocalState::revoke_memory( state, agg_method, hash_table, size_to_revoke, _eos)); }}, agg_data->method_variant); - RETURN_IF_ERROR(Base::_shared_state->sink_status); - Base::_shared_state->sink_status = - parent._agg_sink_operator->reset_hash_table(runtime_state); - return Base::_shared_state->sink_status; + RETURN_IF_ERROR(status); + status = parent._agg_sink_operator->reset_hash_table(runtime_state); + return status; }); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index f80110e5326..04042c5841a 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -142,21 +142,24 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: bool* eos) { auto& local_state = get_local_state(state); local_state.copy_shared_spill_profile(); + Status status; Defer defer {[&]() { - if (!local_state._status.ok() || *eos) { + if (!status.ok() || *eos) { local_state._shared_state->close(); } }}; local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._status); + status = local_state._shared_state->_spill_status.status(); + RETURN_IF_ERROR(status); if (local_state._shared_state->is_spilled && local_state._need_to_merge_data_for_current_partition) { if (local_state._blocks.empty() && !local_state._current_partition_eos) { bool has_recovering_data = false; - RETURN_IF_ERROR(local_state.recover_blocks_from_disk(state, has_recovering_data)); + status = local_state.recover_blocks_from_disk(state, has_recovering_data); + RETURN_IF_ERROR(status); *eos = !has_recovering_data; return Status::OK(); } else if (!local_state._blocks.empty()) { @@ -165,8 +168,9 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: auto block = std::move(local_state._blocks.front()); merged_rows += block.rows(); local_state._blocks.erase(local_state._blocks.begin()); - RETURN_IF_ERROR(_agg_source_operator->merge_with_serialized_key_helper<false>( - local_state._runtime_state.get(), &block)); + status = _agg_source_operator->merge_with_serialized_key_helper<false>( + local_state._runtime_state.get(), &block); + RETURN_IF_ERROR(status); } local_state._estimate_memory_usage += _agg_source_operator->get_estimated_memory_size_for_merging( @@ -183,8 +187,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: // not spilled in sink or current partition still has data auto* runtime_state = local_state._runtime_state.get(); local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once(); - local_state._status = _agg_source_operator->get_block(runtime_state, block, eos); - RETURN_IF_ERROR(local_state._status); + status = _agg_source_operator->get_block(runtime_state, block, eos); + RETURN_IF_ERROR(status); if (local_state._runtime_state) { auto* source_local_state = local_state._runtime_state->get_local_state(_agg_source_operator->operator_id()); @@ -195,7 +199,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: !local_state._shared_state->spill_partitions.empty()) { local_state._current_partition_eos = false; local_state._need_to_merge_data_for_current_partition = true; - RETURN_IF_ERROR(local_state._shared_state->in_mem_shared_state->reset_hash_table()); + status = local_state._shared_state->in_mem_shared_state->reset_hash_table(); + RETURN_IF_ERROR(status); *eos = false; } } @@ -240,15 +245,15 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b has_data = true; auto spill_func = [this, state, query_id] { + Status status; Defer defer {[&]() { - if (!_status.ok() || state->is_cancelled()) { - if (!_status.ok()) { + if (!status.ok() || state->is_cancelled()) { + if (!status.ok()) { LOG(WARNING) << "query " << print_id(query_id) << " agg node " - << _parent->node_id() << " recover agg data error: " << _status; + << _parent->node_id() << " recover agg data error: " << status; } _shared_state->close(); } - _spill_dependency->Dependency::set_ready(); }}; bool has_agg_data = false; size_t accumulated_blocks_size = 0; @@ -262,15 +267,15 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", { - _status = Status::Error<INTERNAL_ERROR>( + status = Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_agg_source " "recover_spill_data failed"); }); - if (_status.ok()) { - _status = stream->read_next_block_sync(&block, &eos); + if (status.ok()) { + status = stream->read_next_block_sync(&block, &eos); } } - RETURN_IF_ERROR(_status); + RETURN_IF_ERROR(status); if (!block.empty()) { has_agg_data = true; @@ -292,34 +297,20 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b } } } - return _status; + return status; }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - auto exception_catch_func = [spill_func, query_id, submit_timer, this]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_read_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_recover_time); - + auto exception_catch_func = [spill_func, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_source " "merge spill data canceled"); ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st); - return; + return st; }); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - - if (!status.ok()) { - _status = status; - } + return status; }; DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", { @@ -327,9 +318,14 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b "fault_inject partitioned_agg_source submit_func failed"); }); _spill_dependency->block(); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, - _shared_state->shared_from_this(), - exception_catch_func)); + std::make_shared<SpillRunnable>(state, nullptr, _spilling_task_count, + _runtime_profile.get(), submit_timer, + _shared_state->shared_from_this(), _spill_dependency, + false, false, exception_catch_func)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index c2a8c71e672..f51695b982b 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -53,7 +53,6 @@ protected: std::unique_ptr<RuntimeState> _runtime_state; bool _opened = false; - Status _status; std::unique_ptr<std::promise<Status>> _spill_merge_promise; std::future<Status> _spill_merge_future; bool _current_partition_eos = true; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 6df2a7ca4ab..f1ca6f8d7de 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -23,6 +23,7 @@ #include <algorithm> #include <utility> +#include "common/exception.h" #include "common/logging.h" #include "common/status.h" #include "pipeline/pipeline_task.h" @@ -205,7 +206,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( state, spilling_stream, print_id(state->query_id()), "hash_probe", _parent->node_id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _runtime_profile.get())); - spilling_stream->set_write_counters(_runtime_profile.get()); } auto merged_block = vectorized::MutableBlock::create_unique(blocks[0].clone_empty()); @@ -243,36 +243,17 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( return Status::OK(); }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto exception_catch_func = [query_id, spill_func, spill_context, submit_timer, this]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_write_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_write_timer); - + auto exception_catch_func = [query_id, spill_func, spill_context]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " - "spill_probe_blocks canceled")); - return; + auto status = Status::InternalError( + "fault_inject partitioned_hash_join_probe " + "spill_probe_blocks canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; }); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - - if (!status.ok()) { - _spill_status_ok = false; - _spill_status = std::move(status); - } - _spill_dependency->set_ready(); - if (spill_context) { - spill_context->on_non_sink_task_finished(); - } + return status; }; if (spill_context) { @@ -284,9 +265,14 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( "fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed"); }); - auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), true, - _shared_state->shared_from_this(), - exception_catch_func); + MonotonicStopWatch submit_timer; + submit_timer.start(); + + _spilling_task_count = 1; + auto spill_runnable = std::make_shared<SpillRunnable>( + state, spill_context, _spilling_task_count, _runtime_profile.get(), submit_timer, + _shared_state->shared_from_this(), _spill_dependency, false, true, + exception_catch_func); return spill_io_pool->submit(std::move(spill_runnable)); } @@ -314,39 +300,26 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } spilled_stream->set_read_counters(profile()); - std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = - _shared_state->shared_from_this(); - auto query_id = state->query_id(); - auto read_func = [this, query_id, state, spilled_stream = spilled_stream, shared_state_holder, - partition_index] { - auto shared_state_sptr = shared_state_holder.lock(); - if (!shared_state_sptr || state->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return; - } - + auto read_func = [this, query_id, state, spilled_stream = spilled_stream, partition_index] { SCOPED_TIMER(_recovery_build_timer); bool eos = false; VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << ", recoverying build data"; + Status status; while (!eos) { vectorized::Block block; - Status st; DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks", { - st = Status::Error<INTERNAL_ERROR>( + status = Status::Error<INTERNAL_ERROR>( "fault_inject partitioned_hash_join_probe recover_build_blocks failed"); }); - if (st.ok()) { - st = spilled_stream->read_next_block_sync(&block, &eos); + if (status.ok()) { + status = spilled_stream->read_next_block_sync(&block, &eos); } - if (!st.ok()) { - _spill_status_ok = false; - _spill_status = std::move(st); + if (!status.ok()) { break; } COUNTER_UPDATE(_recovery_build_rows, block.rows()); @@ -365,10 +338,8 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim _recovered_build_block = vectorized::MutableBlock::create_unique(std::move(block)); } else { DCHECK_EQ(_recovered_build_block->columns(), block.columns()); - st = _recovered_build_block->merge(std::move(block)); - if (!st.ok()) { - _spill_status_ok = false; - _spill_status = std::move(st); + status = _recovered_build_block->merge(std::move(block)); + if (!status.ok()) { break; } } @@ -381,43 +352,29 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim if (eos) { ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); - shared_state_sptr->spilled_streams[partition_index].reset(); + _shared_state->spilled_streams[partition_index].reset(); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index; } + return status; }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto exception_catch_func = [read_func, query_id, submit_timer, this]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_read_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_recover_time); - + auto exception_catch_func = [read_func, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " - "recover_build_blocks canceled")); - return; + auto status = Status::InternalError( + "fault_inject partitioned_hash_join_probe " + "recover_build_blocks canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; }); auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(read_func()); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func()); return Status::OK(); }(); - if (!status.ok()) { - _spill_status_ok = false; - _spill_status = std::move(status); - } - _spill_dependency->set_ready(); + return status; }; auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); @@ -440,9 +397,15 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim "fault_inject partitioned_hash_join_probe " "recovery_build_blocks submit_func failed"); }); - auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, - _shared_state->shared_from_this(), - exception_catch_func); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + + _spilling_task_count = 1; + auto spill_runnable = std::make_shared<SpillRunnable>( + state, nullptr, _spilling_task_count, _runtime_profile.get(), submit_timer, + _shared_state->shared_from_this(), _spill_dependency, false, false, + exception_catch_func); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << " recover_build_blocks_from_disk submit func"; @@ -500,8 +463,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim while (!eos && !_state->is_cancelled() && st.ok()) { st = spilled_stream->read_next_block_sync(&block, &eos); if (!st.ok()) { - _spill_status_ok = false; - _spill_status = std::move(st); + break; } else { COUNTER_UPDATE(_recovery_probe_rows, block.rows()); COUNTER_UPDATE(_recovery_probe_blocks, 1); @@ -519,37 +481,24 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); } + return st; }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - auto exception_catch_func = [read_func, query_id, submit_timer, this]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_read_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_recover_time); - + auto exception_catch_func = [read_func, query_id]() { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " - "recover_probe_blocks canceled")); - return; + auto status = Status::InternalError( + "fault_inject partitioned_hash_join_probe " + "recover_probe_blocks canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; }); auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(read_func()); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func()); return Status::OK(); }(); - if (!status.ok()) { - _spill_status_ok = false; - _spill_status = std::move(status); - } - _spill_dependency->set_ready(); + return status; }; auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); @@ -562,8 +511,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim "fault_inject partitioned_hash_join_probe " "recovery_probe_blocks submit_func failed"); }); + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; return spill_io_pool->submit(std::make_shared<SpillRunnable>( - state, _runtime_profile.get(), false, _shared_state->shared_from_this(), + state, nullptr, _spilling_task_count, _runtime_profile.get(), submit_timer, + _shared_state->shared_from_this(), _spill_dependency, false, false, exception_catch_func)); } @@ -759,9 +712,8 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) const { auto& local_state = get_local_state(state); - if (!local_state._spill_status_ok) { - DCHECK_NE(local_state._spill_status.code(), 0); - return local_state._spill_status; + if (!local_state._shared_state->_spill_status.ok()) { + return local_state._shared_state->_spill_status.status(); } const auto partition_index = local_state._partition_cursor; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index e76cfdeb2a7..f020c0a832a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -74,12 +74,6 @@ private: std::unique_ptr<vectorized::Block> _child_block; bool _child_eos {false}; - std::mutex _spill_lock; - Status _spill_status; - - std::atomic<int> _spilling_task_count {0}; - std::atomic<bool> _spill_status_ok {true}; - std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks; std::unique_ptr<vectorized::MutableBlock> _recovered_build_block; std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 474f03363f2..d3d010e0d7c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -73,7 +73,6 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { state, spilling_stream, print_id(state->query_id()), fmt::format("hash_build_sink_{}", i), _parent->node_id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile)); - spilling_stream->set_write_counters(_profile); } return p._partitioner->clone(state, _partitioner); } @@ -203,21 +202,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( std::for_each(partitions_indexes.begin(), partitions_indexes.end(), [](std::vector<uint32_t>& indices) { indices.reserve(reserved_size); }); - auto flush_rows = [&state, this](std::unique_ptr<vectorized::MutableBlock>& partition_block, - vectorized::SpillStreamSPtr& spilling_stream) { - auto block = partition_block->to_block(); - auto status = spilling_stream->spill_block(state, block, false); - - if (!status.ok()) { - std::unique_lock<std::mutex> lock(_spill_lock); - _spill_status = status; - _spill_status_ok = false; - _spill_dependency->set_ready(); - return false; - } - return true; - }; - size_t total_rows = build_block.rows(); size_t offset = 1; while (offset < total_rows) { @@ -261,22 +245,14 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( int64_t old_mem = partition_block->allocated_bytes(); { SCOPED_TIMER(_partition_shuffle_timer); - Status st = partition_block->add_rows(&sub_block, begin, end); - if (!st.ok()) { - std::unique_lock<std::mutex> lock(_spill_lock); - _spill_status = st; - _spill_status_ok = false; - _spill_dependency->set_ready(); - return; - } + RETURN_IF_ERROR(partition_block->add_rows(&sub_block, begin, end)); partitions_indexes[partition_idx].clear(); } int64_t new_mem = partition_block->allocated_bytes(); if (partition_block->rows() >= reserved_size || is_last_block) { - if (!flush_rows(partition_block, spilling_stream)) { - return; - } + auto block = partition_block->to_block(); + RETURN_IF_ERROR(spilling_stream->spill_block(state, block, false)); partition_block = vectorized::MutableBlock::create_unique(build_block.clone_empty()); COUNTER_UPDATE(_memory_used_counter, -new_mem); @@ -287,40 +263,20 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } } - _spill_dependency->set_ready(); + return Status::OK(); }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - auto exception_catch_func = [spill_func, spill_context, submit_timer, this]() mutable { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_write_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_write_timer); - - auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(spill_func()); - return Status::OK(); - }(); - - if (!status.ok()) { - std::unique_lock<std::mutex> lock(_spill_lock); - _spill_status = status; - _spill_status_ok = false; - _spill_dependency->set_ready(); - } - - if (spill_context) { - spill_context->on_task_finished(); - } + auto exception_catch_func = [spill_func]() mutable { + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); }(); + return status; }; + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; auto spill_runnable = std::make_shared<SpillRunnable>( - state, _profile, true, _shared_state->shared_from_this(), exception_catch_func); + state, spill_context, _spilling_task_count, _profile, submit_timer, + _shared_state->shared_from_this(), _spill_dependency, true, true, exception_catch_func); auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); @@ -340,7 +296,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << state->task_id() << " hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; - DCHECK_EQ(_spilling_streams_count, 0); + DCHECK_EQ(_spilling_task_count, 0); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); if (!_shared_state->need_to_spill) { @@ -349,20 +305,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( return _revoke_unpartitioned_block(state, spill_context); } - _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); + _spilling_task_count = _shared_state->partitioned_build_blocks.size(); auto query_id = state->query_id(); auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); + auto spill_fin_cb = [this, state, query_id, spill_context]() { + Status status; + if (_child_eos) { + VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " + << _parent->node_id() << " set_ready_to_read" + << ", task id: " << state->task_id(); + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); + auto st = _finish_spilling(); + if (status.ok()) { + status = st; + } + _dependency->set_ready_to_read(); + } + return status; + }; for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; if (!mutable_block || mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - --_spilling_streams_count; + --_spilling_task_count; continue; } @@ -380,54 +356,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( // so that when a stream finished, it should desc -1 state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( - state, _profile, true, _shared_state->shared_from_this(), - [this, query_id, spilling_stream, i, submit_timer, spill_context] { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_write_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_write_timer); - + state, spill_context, _spilling_task_count, _profile, submit_timer, + _shared_state->shared_from_this(), _spill_dependency, true, true, + [this, query_id, spilling_stream, i, spill_context] { DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, Status::InternalError( - "fault_inject partitioned_hash_join_sink " - "revoke_memory canceled")); - return; + auto status = Status::InternalError( + "fault_inject partitioned_hash_join_sink " + "revoke_memory canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, + status); + return status; }); SCOPED_TIMER(_spill_build_timer); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION( - _spill_to_disk(i, spilling_stream, spill_context)); - return Status::OK(); + return _spill_to_disk(i, spilling_stream, spill_context)); }(); _state->get_query_ctx()->decrease_revoking_tasks_count(); - - if (!status.ok()) { - std::unique_lock<std::mutex> lock(_spill_lock); - _spill_dependency->set_ready(); - _spill_status_ok = false; - _spill_status = std::move(status); - } - }); + return status; + }, + spill_fin_cb); if (st.ok()) { st = spill_io_pool->submit(std::move(spill_runnable)); } if (!st.ok()) { - --_spilling_streams_count; + --_spilling_task_count; return st; } } - std::unique_lock<std::mutex> lock(_spill_lock); - if (_spilling_streams_count > 0) { + if (_spilling_task_count > 0) { _spill_dependency->block(); } else if (_child_eos) { VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join sink " @@ -502,50 +464,24 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, return Status::OK(); } -void PartitionedHashJoinSinkLocalState::_spill_to_disk( +Status PartitionedHashJoinSinkLocalState::_spill_to_disk( uint32_t partition_index, const vectorized::SpillStreamSPtr& spilling_stream, const std::shared_ptr<SpillContext>& spill_context) { auto& partitioned_block = _shared_state->partitioned_build_blocks[partition_index]; - if (_spill_status_ok) { + Status status = _shared_state->_spill_status.status(); + if (status.ok()) { auto block = partitioned_block->to_block(); int64_t block_mem_usage = block.allocated_bytes(); Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), -block_mem_usage); }}; partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty()); - auto st = spilling_stream->spill_block(state(), block, false); - if (!st.ok()) { - _spill_status_ok = false; - std::lock_guard<std::mutex> l(_spill_status_lock); - _spill_status = st; - } + status = spilling_stream->spill_block(state(), block, false); } VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " << _state->task_id() << ", join sink " << _parent->node_id() << " revoke done"; - auto num = _spilling_streams_count.fetch_sub(1); - DCHECK_GE(_spilling_streams_count, 0); - - if (num == 1) { - std::unique_lock<std::mutex> lock(_spill_lock); - _spill_dependency->set_ready(); - if (_child_eos) { - VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " - << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state()->task_id(); - std::for_each(_shared_state->partitioned_build_blocks.begin(), - _shared_state->partitioned_build_blocks.end(), [&](auto& block) { - if (block) { - COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); - } - }); - _spill_status = _finish_spilling(); - _dependency->set_ready_to_read(); - } - if (spill_context) { - spill_context->on_task_finished(); - } - } + return status; } PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX( @@ -635,9 +571,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr); local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - if (!local_state._spill_status_ok) { - DCHECK_NE(local_state._spill_status.code(), 0); - return local_state._spill_status; + if (!local_state._shared_state->_spill_status.ok()) { + return local_state._shared_state->_spill_status.status(); } local_state._child_eos = eos; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index f96f54dc4db..e6993faa462 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -53,9 +53,9 @@ protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {} - void _spill_to_disk(uint32_t partition_index, - const vectorized::SpillStreamSPtr& spilling_stream, - const std::shared_ptr<SpillContext>& spill_context); + Status _spill_to_disk(uint32_t partition_index, + const vectorized::SpillStreamSPtr& spilling_stream, + const std::shared_ptr<SpillContext>& spill_context); Status _partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin, size_t end); @@ -67,17 +67,10 @@ protected: friend class PartitionedHashJoinSinkOperatorX; - std::atomic_int _spilling_streams_count {0}; - std::atomic<bool> _spill_status_ok {true}; - std::mutex _spill_lock; - vectorized::Block _pending_block; bool _child_eos {false}; - Status _spill_status; - std::mutex _spill_status_lock; - std::unique_ptr<vectorized::PartitionerBase> _partitioner; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 5d8355b865d..48a1b2b6ec3 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -144,7 +144,7 @@ Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state, size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); - if (!local_state.Base::_shared_state->sink_status.ok()) { + if (!local_state._shared_state->_spill_status.ok()) { return UINT64_MAX; } return _sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get()); @@ -155,7 +155,9 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc auto& local_state = get_local_state(state); local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status); + if (!local_state._shared_state->_spill_status.ok()) { + return local_state._shared_state->_spill_status.status(); + } COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { local_state._shared_state->update_spill_block_batch_row_count(in_block); @@ -198,14 +200,15 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << Base::_parent->node_id() << " revoke_memory" << ", eos: " << _eos; - RETURN_IF_ERROR(Base::_shared_state->sink_status); + if (!_shared_state->_spill_status.ok()) { + return _shared_state->_spill_status.status(); + } auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, _spilling_stream, print_id(state->query_id()), "sort", _parent->node_id(), _shared_state->spill_block_batch_row_count, SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); RETURN_IF_ERROR(status); - _spilling_stream->set_write_counters(_profile); _shared_state->sorted_streams.emplace_back(_spilling_stream); @@ -218,12 +221,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, auto query_id = state->query_id(); auto spill_func = [this, state, query_id, &parent] { + Status status; Defer defer {[&]() { - if (!_shared_state->sink_status.ok() || state->is_cancelled()) { - if (!_shared_state->sink_status.ok()) { + if (!status.ok() || state->is_cancelled()) { + if (!status.ok()) { LOG(WARNING) << "query " << print_id(query_id) << " sort node " - << _parent->node_id() - << " revoke memory error: " << _shared_state->sink_status; + << _parent->node_id() << " revoke memory error: " << status; } _shared_state->close(); } else { @@ -231,7 +234,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, << " revoke memory finish"; } - if (!_shared_state->sink_status.ok()) { + if (!status.ok()) { _shared_state->close(); } @@ -240,14 +243,11 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, if (_eos) { _dependency->set_ready_to_read(); _finish_dependency->set_ready(); - } else { - _spill_dependency->Dependency::set_ready(); } }}; - _shared_state->sink_status = - parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); - RETURN_IF_ERROR(_shared_state->sink_status); + status = parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); + RETURN_IF_ERROR(status); auto* sink_local_state = _runtime_state->get_sink_local_state(); update_profile(sink_local_state->profile()); @@ -257,13 +257,13 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, while (!eos && !state->is_cancelled()) { { SCOPED_TIMER(_spill_merge_sort_timer); - _shared_state->sink_status = parent._sort_sink_operator->merge_sort_read_for_spill( + status = parent._sort_sink_operator->merge_sort_read_for_spill( _runtime_state.get(), &block, _shared_state->spill_block_batch_row_count, &eos); } - RETURN_IF_ERROR(_shared_state->sink_status); - _shared_state->sink_status = _spilling_stream->spill_block(state, block, eos); - RETURN_IF_ERROR(_shared_state->sink_status); + RETURN_IF_ERROR(status); + status = _spilling_stream->spill_block(state, block, eos); + RETURN_IF_ERROR(status); block.clear_column_data(); } parent._sort_sink_operator->reset(_runtime_state.get()); @@ -271,33 +271,18 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return Status::OK(); }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto exception_catch_func = [this, query_id, spill_context, submit_timer, spill_func]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_write_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_write_timer); - + auto exception_catch_func = [query_id, spill_context, spill_func]() { DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_id, Status::InternalError("fault_inject spill_sort_sink " - "revoke_memory canceled")); - return; + auto status = Status::InternalError( + "fault_inject spill_sort_sink " + "revoke_memory canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; }); - _shared_state->sink_status = [&]() { - RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); - }(); + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); - if (spill_context) { - spill_context->on_task_finished(); - } + return status; }; DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", { @@ -307,10 +292,15 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, }); if (status.ok()) { state->get_query_ctx()->increase_revoking_tasks_count(); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _profile, true, - _shared_state->shared_from_this(), - exception_catch_func)); + std::make_shared<SpillRunnable>( + state, spill_context, _spilling_task_count, _profile, submit_timer, + _shared_state->shared_from_this(), _spill_dependency, true, true, + exception_catch_func)); } if (!status.ok()) { if (!_eos) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 5cc124caaea..2308c49d893 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -81,11 +81,12 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto spill_func = [this, state, query_id, &parent] { SCOPED_TIMER(_spill_merge_sort_timer); + Status status; Defer defer {[&]() { - if (!_status.ok() || state->is_cancelled()) { - if (!_status.ok()) { + if (!status.ok() || state->is_cancelled()) { + if (!status.ok()) { LOG(WARNING) << "query " << print_id(query_id) << " sort node " - << _parent->node_id() << " merge spill data error: " << _status; + << _parent->node_id() << " merge spill data error: " << status; } _shared_state->close(); for (auto& stream : _current_merging_streams) { @@ -96,7 +97,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data finish"; } - _spill_dependency->Dependency::set_ready(); }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; @@ -108,11 +108,11 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat << ", curren merge max stream count: " << max_stream_count; { SCOPED_TIMER(Base::_spill_recover_time); - _status = _create_intermediate_merger( + status = _create_intermediate_merger( max_stream_count, parent._sort_source_operator->get_sort_description(_runtime_state.get())); } - RETURN_IF_ERROR(_status); + RETURN_IF_ERROR(status); // all the remaining streams can be merged in a run if (_shared_state->sorted_streams.empty()) { @@ -120,12 +120,11 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } { - _status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, tmp_stream, print_id(state->query_id()), "sort", _parent->node_id(), _shared_state->spill_block_batch_row_count, SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); - RETURN_IF_ERROR(_status); - tmp_stream->set_write_counters(profile()); + RETURN_IF_ERROR(status); _shared_state->sorted_streams.emplace_back(tmp_stream); @@ -135,24 +134,24 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat { SCOPED_TIMER(Base::_spill_recover_time); DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", { - _status = Status::Error<INTERNAL_ERROR>( + status = Status::Error<INTERNAL_ERROR>( "fault_inject spill_sort_source " "recover_spill_data failed"); }); - if (_status.ok()) { - _status = _merger->get_next(&merge_sorted_block, &eos); + if (status.ok()) { + status = _merger->get_next(&merge_sorted_block, &eos); } } - RETURN_IF_ERROR(_status); - _status = tmp_stream->spill_block(state, merge_sorted_block, eos); - if (_status.ok()) { + RETURN_IF_ERROR(status); + status = tmp_stream->spill_block(state, merge_sorted_block, eos); + if (status.ok()) { DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", { - _status = Status::Error<INTERNAL_ERROR>( + status = Status::Error<INTERNAL_ERROR>( "fault_inject spill_sort_source " "spill_merged_data failed"); }); } - RETURN_IF_ERROR(_status); + RETURN_IF_ERROR(status); } } for (auto& stream : _current_merging_streams) { @@ -163,20 +162,9 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat return Status::OK(); }; - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto exception_catch_func = [this, spill_func, submit_timer]() { - auto submit_elapsed_time = submit_timer.elapsed_time(); - _spill_read_wait_in_queue_timer->update(submit_elapsed_time); - exec_time_counter()->update(submit_elapsed_time); - _spill_total_timer->update(submit_elapsed_time); - - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_spill_total_timer); - SCOPED_TIMER(_spill_recover_time); - - _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + auto exception_catch_func = [spill_func]() { + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + return status; }; DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func", { @@ -184,10 +172,15 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat "fault_inject spill_sort_source " "merge_sort_spill_data submit_func failed"); }); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + _spilling_task_count = 1; return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, - _shared_state->shared_from_this(), - exception_catch_func)); + std::make_shared<SpillRunnable>(state, nullptr, _spilling_task_count, + _runtime_profile.get(), submit_timer, + _shared_state->shared_from_this(), _spill_dependency, + false, false, exception_catch_func)); } Status SpillSortLocalState::_create_intermediate_merger( @@ -261,8 +254,9 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc bool* eos) { auto& local_state = get_local_state(state); local_state.copy_shared_spill_profile(); + Status status; Defer defer {[&]() { - if (!local_state._status.ok() || *eos) { + if (!status.ok() || *eos) { local_state._shared_state->close(); for (auto& stream : local_state._current_merging_streams) { (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); @@ -272,20 +266,21 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc }}; local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._status); + status = local_state._shared_state->_spill_status.status(); + RETURN_IF_ERROR(status); if (local_state._shared_state->is_spilled) { if (!local_state._merger) { - local_state._status = local_state.initiate_merge_sort_spill_streams(state); - return local_state._status; + status = local_state.initiate_merge_sort_spill_streams(state); + return status; } else { - local_state._status = local_state._merger->get_next(block, eos); - RETURN_IF_ERROR(local_state._status); + SCOPED_TIMER(local_state._spill_total_timer); + status = local_state._merger->get_next(block, eos); + RETURN_IF_ERROR(status); } } else { - local_state._status = - _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos); - RETURN_IF_ERROR(local_state._status); + status = _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos); + RETURN_IF_ERROR(status); } local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 7536dd15e92..a7b8e8efde8 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -54,7 +54,6 @@ protected: std::unique_ptr<RuntimeState> _runtime_state; bool _opened = false; - Status _status; int64_t _external_sort_bytes_threshold = 134217728; // 128M std::vector<vectorized::SpillStreamSPtr> _current_merging_streams; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index 2ea5cedcdb0..687779badbb 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -79,21 +79,40 @@ private: class SpillRunnable : public Runnable { public: - SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write, - const std::shared_ptr<BasicSharedState>& shared_state, std::function<void()> func) - : _state(state), + SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context, + std::atomic_int& spilling_task_count, RuntimeProfile* profile, + MonotonicStopWatch submit_timer, + const std::shared_ptr<BasicSpillSharedState>& shared_state, + std::shared_ptr<Dependency> spill_dependency, bool is_sink, bool is_write, + std::function<Status()> spill_exec_func, + std::function<Status()> spill_fin_cb = {}) + : _is_sink(is_sink), _is_write(is_write), + _state(state), + _spill_context(std::move(spill_context)), + _spilling_task_count(spilling_task_count), + _spill_dependency(std::move(spill_dependency)), + _submit_timer(submit_timer), _task_context_holder(state->get_task_execution_context()), _shared_state_holder(shared_state), - _func(std::move(func)) { - write_wait_in_queue_task_count = profile->get_counter("SpillWriteTaskWaitInQueueCount"); - writing_task_count = profile->get_counter("SpillWriteTaskCount"); - read_wait_in_queue_task_count = profile->get_counter("SpillReadTaskWaitInQueueCount"); - reading_task_count = profile->get_counter("SpillReadTaskCount"); + _spill_exec_func(std::move(spill_exec_func)), + _spill_fin_cb(std::move(spill_fin_cb)) { + _exec_timer = profile->get_counter("ExecTime"); + _spill_total_timer = profile->get_counter("SpillTotalTime"); + + _spill_write_timer = profile->get_counter("SpillWriteTime"); + _spill_write_wait_in_queue_timer = profile->get_counter("SpillWriteTaskWaitInQueueTime"); + _write_wait_in_queue_task_count = profile->get_counter("SpillWriteTaskWaitInQueueCount"); + _writing_task_count = profile->get_counter("SpillWriteTaskCount"); + + _spill_revover_timer = profile->get_counter("SpillRecoverTime"); + _spill_read_wait_in_queue_timer = profile->get_counter("SpillReadTaskWaitInQueueTime"); + _read_wait_in_queue_task_count = profile->get_counter("SpillReadTaskWaitInQueueCount"); + _reading_task_count = profile->get_counter("SpillReadTaskCount"); if (is_write) { - COUNTER_UPDATE(write_wait_in_queue_task_count, 1); + COUNTER_UPDATE(_write_wait_in_queue_task_count, 1); } else { - COUNTER_UPDATE(read_wait_in_queue_task_count, 1); + COUNTER_UPDATE(_read_wait_in_queue_task_count, 1); } } @@ -106,22 +125,46 @@ public: if (!task_context_holder) { return; } + + auto submit_elapsed_time = _submit_timer.elapsed_time(); + if (_is_write) { + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + } else { + _spill_read_wait_in_queue_timer->update(submit_elapsed_time); + } + _exec_timer->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(_exec_timer); + SCOPED_TIMER(_spill_total_timer); + + std::shared_ptr<ScopedTimer<MonotonicStopWatch>> write_or_read_timer; if (_is_write) { - COUNTER_UPDATE(write_wait_in_queue_task_count, -1); - COUNTER_UPDATE(writing_task_count, 1); + write_or_read_timer = + std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_write_timer); + COUNTER_UPDATE(_write_wait_in_queue_task_count, -1); + COUNTER_UPDATE(_writing_task_count, 1); } else { - COUNTER_UPDATE(read_wait_in_queue_task_count, -1); - COUNTER_UPDATE(reading_task_count, 1); + write_or_read_timer = + std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_revover_timer); + COUNTER_UPDATE(_read_wait_in_queue_task_count, -1); + COUNTER_UPDATE(_reading_task_count, 1); } SCOPED_ATTACH_TASK(_state); Defer defer([&] { if (_is_write) { - COUNTER_UPDATE(writing_task_count, -1); + COUNTER_UPDATE(_writing_task_count, -1); } else { - COUNTER_UPDATE(reading_task_count, -1); + COUNTER_UPDATE(_reading_task_count, -1); + } + { + std::function<Status()> tmp; + std::swap(tmp, _spill_exec_func); + } + { + std::function<Status()> tmp; + std::swap(tmp, _spill_fin_cb); } - std::function<void()> tmp; - std::swap(tmp, _func); }); auto shared_state_holder = _shared_state_holder.lock(); @@ -132,19 +175,53 @@ public: if (_state->is_cancelled()) { return; } - _func(); + shared_state_holder->_spill_status.update(_spill_exec_func()); + + auto num = _spilling_task_count.fetch_sub(1); + DCHECK_GE(_spilling_task_count, 0); + + if (num == 1) { + if (_spill_fin_cb) { + shared_state_holder->_spill_status.update(_spill_fin_cb()); + } + if (_spill_context) { + if (_is_sink) { + _spill_context->on_task_finished(); + } else { + _spill_context->on_non_sink_task_finished(); + } + } + _spill_dependency->set_ready(); + } } private: - RuntimeState* _state; + bool _is_sink; bool _is_write; - RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr; - RuntimeProfile::Counter* writing_task_count = nullptr; - RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr; - RuntimeProfile::Counter* reading_task_count = nullptr; + RuntimeState* _state; + std::shared_ptr<SpillContext> _spill_context; + std::atomic_int& _spilling_task_count; + std::shared_ptr<Dependency> _spill_dependency; + + MonotonicStopWatch _submit_timer; + + RuntimeProfile::Counter* _exec_timer = nullptr; + RuntimeProfile::Counter* _spill_total_timer; + + RuntimeProfile::Counter* _spill_write_timer; + RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr; + RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* _writing_task_count = nullptr; + + RuntimeProfile::Counter* _spill_revover_timer; + RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr; + RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* _reading_task_count = nullptr; + std::weak_ptr<TaskExecutionContext> _task_context_holder; - std::weak_ptr<BasicSharedState> _shared_state_holder; - std::function<void()> _func; + std::weak_ptr<BasicSpillSharedState> _shared_state_holder; + std::function<Status()> _spill_exec_func; + std::function<Status()> _spill_fin_cb; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index ce07b4d331c..000617067f4 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -97,6 +97,7 @@ void SpillStream::gc() { Status SpillStream::prepare() { writer_ = std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, data_dir_, spill_dir_); + _set_write_counters(profile_); reader_ = std::make_unique<SpillReader>(stream_id_, writer_->get_file_path()); diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 6779234279c..9682130aad0 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -63,8 +63,6 @@ public: Status read_next_block_sync(Block* block, bool* eos); - void set_write_counters(RuntimeProfile* profile) { writer_->set_counters(profile); } - void set_read_counters(RuntimeProfile* profile) { reader_->set_counters(profile); } void update_shared_profiles(RuntimeProfile* source_op_profile); @@ -76,6 +74,8 @@ private: Status prepare(); + void _set_write_counters(RuntimeProfile* profile) { writer_->set_counters(profile); } + RuntimeState* state_ = nullptr; int64_t stream_id_; SpillDataDir* data_dir_ = nullptr; diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 26d8f06a623..f6e16518043 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -51,8 +51,10 @@ Status SpillWriter::close() { } total_written_bytes_ += meta_.size(); - COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size()); - + COUNTER_UPDATE(_write_file_total_size, meta_.size()); + if (_write_file_current_size) { + COUNTER_UPDATE(_write_file_current_size, meta_.size()); + } data_dir_->update_spill_data_usage(meta_.size()); RETURN_IF_ERROR(file_writer_->close()); @@ -146,7 +148,10 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size); meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); - COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size); + COUNTER_UPDATE(_write_file_total_size, buff_size); + if (_write_file_current_size) { + COUNTER_UPDATE(_write_file_current_size, buff_size); + } COUNTER_UPDATE(_write_block_counter, 1); total_written_bytes_ += buff_size; ++written_blocks_; diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 1212d7652a8..23a8e3030e2 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -61,7 +61,8 @@ public: _serialize_timer = profile->get_counter("SpillWriteSerializeBlockTime"); _write_block_counter = profile->get_counter("SpillWriteBlockCount"); _write_block_bytes_counter = profile->get_counter("SpillWriteBlockDataSize"); - _write_file_data_bytes_counter = profile->get_counter("SpillWriteFileTotalSize"); + _write_file_total_size = profile->get_counter("SpillWriteFileTotalSize"); + _write_file_current_size = profile->get_counter("SpillWriteFileCurrentSize"); _write_rows_counter = profile->get_counter("SpillWriteRows"); } @@ -86,7 +87,8 @@ private: RuntimeProfile::Counter* _serialize_timer = nullptr; RuntimeProfile::Counter* _write_block_counter = nullptr; RuntimeProfile::Counter* _write_block_bytes_counter = nullptr; - RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr; + RuntimeProfile::Counter* _write_file_total_size = nullptr; + RuntimeProfile::Counter* _write_file_current_size = nullptr; RuntimeProfile::Counter* _write_rows_counter = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org