This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2e40e395844cf716a91f09c4be9bcf7b583aefbd Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Apr 3 15:19:04 2024 +0800 [chore](spill) add timers for performance tuning (#33185) --- .../exec/partitioned_aggregation_sink_operator.cpp | 5 +- .../partitioned_aggregation_source_operator.cpp | 7 +- .../exec/partitioned_hash_join_probe_operator.cpp | 100 ++++++++++++--------- .../exec/partitioned_hash_join_probe_operator.h | 24 +++-- .../exec/partitioned_hash_join_sink_operator.cpp | 53 +++++------ .../exec/partitioned_hash_join_sink_operator.h | 12 +-- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 10 ++- .../pipeline/exec/spill_sort_source_operator.cpp | 9 +- be/src/pipeline/pipeline_x/operator.h | 6 ++ 9 files changed, 137 insertions(+), 89 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 64996724e15..3dea330c117 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -252,13 +252,16 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + MonotonicStopWatch submit_timer; + submit_timer.start(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, &parent, state, execution_context] { + [this, &parent, state, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return Status::Cancelled("Cancelled"); } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_ATTACH_TASK(state); SCOPED_TIMER(Base::_spill_timer); Defer defer {[&]() { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 5db80788f41..5680b75c87e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -197,9 +197,13 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + RETURN_IF_ERROR( ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, state, execution_context] { + [this, state, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; @@ -207,6 +211,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime return Status::Cancelled("Cancelled"); } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_status.ok()) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index c23e12c3705..1a05b78b052 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -25,10 +25,11 @@ namespace doris::pipeline { PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) - : JoinProbeLocalState(state, parent) {} + : PipelineXSpillLocalState(state, parent), + _child_block(vectorized::Block::create_unique()) {} Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); + RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info)); _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>(); @@ -38,45 +39,32 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI RETURN_IF_ERROR(_partitioner->init(p._probe_exprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._child_x->row_desc())); - _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "SpillAndPartition"); - _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "SpillAndPartition"); - _partition_shuffle_timer = - ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "SpillAndPartition"); - _spill_build_rows = - ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, "SpillAndPartition"); - _recovery_build_rows = - ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, "SpillAndPartition"); - _spill_probe_rows = - ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, "SpillAndPartition"); - _recovery_probe_rows = - ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, "SpillAndPartition"); - _spill_build_blocks = - ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, "SpillAndPartition"); + _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition"); + _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "Partition"); + _partition_shuffle_timer = ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "Partition"); + _spill_build_rows = ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, "Spill"); + _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1); + _recovery_build_rows = ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, "Spill"); + _recovery_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryBuildTime", "Spill", 1); + _spill_probe_rows = ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, "Spill"); + _recovery_probe_rows = ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, "Spill"); + _spill_build_blocks = ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, "Spill"); _recovery_build_blocks = - ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, "SpillAndPartition"); - _spill_probe_blocks = - ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, "SpillAndPartition"); + ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, "Spill"); + _spill_probe_blocks = ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, "Spill"); + _spill_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", "Spill", 1); _recovery_probe_blocks = - ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "SpillAndPartition"); + ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "Spill"); + _recovery_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryProbeTime", "Spill", 1); - _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL( - Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 1); - _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", - "SpillAndPartition", 1); + _spill_serialize_block_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1); + _spill_write_disk_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1); _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", - TUnit::BYTES, "SpillAndPartition", 1); + TUnit::BYTES, "Spill", 1); _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", - TUnit::UNIT, "SpillAndPartition", 1); - _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", - "SpillAndPartition", 1); - _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", - "SpillAndPartition", 1); - _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", - TUnit::BYTES, "SpillAndPartition", 1); - _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", - "SpillAndPartition", 1); - _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", - "SpillAndPartition", 1); + TUnit::UNIT, "Spill", 1); // Build phase _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); @@ -109,6 +97,10 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _process_other_join_conjunct_timer = ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase"); _init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime", "ProbePhase"); + _probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase"); + _join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer", "ProbePhase"); + _build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock", "ProbePhase"); + _probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows", TUnit::UNIT, "ProbePhase"); return Status::OK(); } #define UPDATE_PROFILE(counter, name) \ @@ -149,7 +141,7 @@ void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch #undef UPDATE_PROFILE Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(PipelineXLocalStateBase::open(state)); + RETURN_IF_ERROR(PipelineXSpillLocalState::open(state)); return _partitioner->open(state); } Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { @@ -157,7 +149,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { return Status::OK(); } dec_running_big_mem_op_num(state); - RETURN_IF_ERROR(JoinProbeLocalState::close(state)); + RETURN_IF_ERROR(PipelineXSpillLocalState::close(state)); return Status::OK(); } @@ -187,13 +179,17 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state build_spilling_stream->get_spill_root_dir()); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + MonotonicStopWatch submit_timer; + submit_timer.start(); return spill_io_pool->submit_func( - [execution_context, state, &build_spilling_stream, &mutable_block, this] { + [execution_context, state, &build_spilling_stream, &mutable_block, submit_timer, this] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return; } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_build_timer); (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); if (_spill_status_ok) { @@ -248,14 +244,18 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat if (!blocks.empty()) { auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + MonotonicStopWatch submit_timer; + submit_timer.start(); return spill_io_pool->submit_func( - [execution_context, state, &blocks, spilling_stream, this] { + [execution_context, state, &blocks, spilling_stream, submit_timer, this] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; _dependency->set_ready(); return; } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_probe_timer); SCOPED_ATTACH_TASK(state); COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); while (!blocks.empty() && !state->is_cancelled()) { @@ -329,12 +329,19 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); - auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context] { + + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context, + submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return; } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_recovery_build_timer); Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); @@ -403,12 +410,19 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti /// TODO: maybe recovery more blocks each time. auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); - auto read_func = [this, execution_context, state, &spilled_stream, &blocks] { + + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto read_func = [this, execution_context, state, &spilled_stream, &blocks, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return; } + + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_recovery_probe_timer); Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); @@ -827,4 +841,4 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 8270817758d..143576e1b86 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -38,16 +38,13 @@ using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>; class PartitionedHashJoinProbeOperatorX; class PartitionedHashJoinProbeLocalState final - : public JoinProbeLocalState<PartitionedHashJoinSharedState, - PartitionedHashJoinProbeLocalState> { + : public PipelineXSpillLocalState<PartitionedHashJoinSharedState> { public: using Parent = PartitionedHashJoinProbeOperatorX; ENABLE_FACTORY_CREATOR(PartitionedHashJoinProbeLocalState); PartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent); ~PartitionedHashJoinProbeLocalState() override = default; - void add_tuple_is_null_column(vectorized::Block* block) override {} - Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; @@ -68,9 +65,15 @@ public: friend class PartitionedHashJoinProbeOperatorX; private: + template <typename LocalStateType> + friend class StatefulOperatorX; + std::shared_ptr<BasicSharedState> _in_mem_shared_state_sptr; uint32_t _partition_cursor {0}; + std::unique_ptr<vectorized::Block> _child_block; + bool _child_eos {false}; + std::mutex _spill_lock; Status _spill_status; @@ -98,22 +101,21 @@ private: RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; RuntimeProfile::Counter* _spill_build_rows = nullptr; RuntimeProfile::Counter* _spill_build_blocks = nullptr; + RuntimeProfile::Counter* _spill_build_timer = nullptr; RuntimeProfile::Counter* _recovery_build_rows = nullptr; RuntimeProfile::Counter* _recovery_build_blocks = nullptr; + RuntimeProfile::Counter* _recovery_build_timer = nullptr; RuntimeProfile::Counter* _spill_probe_rows = nullptr; RuntimeProfile::Counter* _spill_probe_blocks = nullptr; + RuntimeProfile::Counter* _spill_probe_timer = nullptr; RuntimeProfile::Counter* _recovery_probe_rows = nullptr; RuntimeProfile::Counter* _recovery_probe_blocks = nullptr; + RuntimeProfile::Counter* _recovery_probe_timer = nullptr; - RuntimeProfile::Counter* _spill_read_data_time = nullptr; - RuntimeProfile::Counter* _spill_deserialize_time = nullptr; - RuntimeProfile::Counter* _spill_read_bytes = nullptr; RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; RuntimeProfile::Counter* _spill_data_size = nullptr; RuntimeProfile::Counter* _spill_block_count = nullptr; - RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; - RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; RuntimeProfile::Counter* _build_phase_label = nullptr; RuntimeProfile::Counter* _build_rows_counter = nullptr; @@ -137,6 +139,10 @@ private: RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr; + RuntimeProfile::Counter* _probe_timer = nullptr; + RuntimeProfile::Counter* _probe_rows_counter = nullptr; + RuntimeProfile::Counter* _join_filter_timer = nullptr; + RuntimeProfile::Counter* _build_output_block_timer = nullptr; }; class PartitionedHashJoinProbeOperatorX final diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index f38354d5de2..8b9accd30ad 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -25,7 +25,7 @@ namespace doris::pipeline { Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { - RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); + RETURN_IF_ERROR(PipelineXSpillSinkLocalState::init(state, info)); auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); _shared_state->partitioned_build_blocks.resize(p._partition_count); _shared_state->spilled_streams.resize(p._partition_count); @@ -33,30 +33,26 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _partitioner = std::make_unique<PartitionerType>(p._partition_count); RETURN_IF_ERROR(_partitioner->init(p._build_exprs)); - _partition_timer = ADD_TIMER(profile(), "PartitionTime"); - _partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime"); - - _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillSerializeBlockTime", 1); - _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteDiskTime", 1); - _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", TUnit::BYTES, 1); - _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); - _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteWaitIOTime", 1); + _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", "Spill", 1); + _partition_shuffle_timer = + ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", "Spill", 1); + _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1); return _partitioner->prepare(state, p._child_x->row_desc()); } Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(PipelineXSinkLocalState::open(state)); + RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state)); return _partitioner->open(state); } Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) { - SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter()); - SCOPED_TIMER(PipelineXSinkLocalState::_close_timer); - if (PipelineXSinkLocalState::_closed) { + SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter()); + SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer); + if (PipelineXSpillSinkLocalState::_closed) { return Status::OK(); } dec_running_big_mem_op_num(state); - return PipelineXSinkLocalState::close(state, exec_status); + return PipelineXSpillSinkLocalState::close(state, exec_status); } Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { @@ -90,16 +86,23 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK(spill_io_pool != nullptr); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); - auto st = spill_io_pool->submit_func([this, execution_context, state, spilling_stream, i] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return; - } - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - _spill_to_disk(i, spilling_stream); - }); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto st = spill_io_pool->submit_func( + [this, execution_context, state, spilling_stream, i, submit_timer] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_build_timer); + (void)state; // avoid ut compile error + SCOPED_ATTACH_TASK(state); + _spill_to_disk(i, spilling_stream); + }); if (!st.ok()) { --_spilling_streams_count; @@ -274,4 +277,4 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) { return local_state.revoke_memory(state); } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index e364e225f66..96e751360d4 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -39,7 +39,7 @@ using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>; class PartitionedHashJoinSinkOperatorX; class PartitionedHashJoinSinkLocalState - : public PipelineXSinkLocalState<PartitionedHashJoinSharedState> { + : public PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState> { public: using Parent = PartitionedHashJoinSinkOperatorX; ENABLE_FACTORY_CREATOR(PartitionedHashJoinSinkLocalState); @@ -51,7 +51,7 @@ public: protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : PipelineXSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {} + : PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {} void _spill_to_disk(uint32_t partition_index, const vectorized::SpillStreamSPtr& spilling_stream); @@ -76,11 +76,7 @@ protected: RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; - RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; - RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; - RuntimeProfile::Counter* _spill_data_size = nullptr; - RuntimeProfile::Counter* _spill_block_count = nullptr; - RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_build_timer = nullptr; }; class PartitionedHashJoinSinkOperatorX @@ -139,4 +135,4 @@ private: }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 662e195f3e5..523ff2cfaaf 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -55,6 +55,9 @@ void SpillSortSinkLocalState::_init_counters() { _spill_merge_sort_timer = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1); + + _spill_wait_in_queue_timer = + ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1); } #define UPDATE_PROFILE(counter, name) \ do { \ @@ -227,17 +230,22 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + + MonotonicStopWatch submit_timer; + submit_timer.start(); + status = ExecEnv::GetInstance() ->spill_stream_mgr() ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, &parent, execution_context] { + ->submit_func([this, state, &parent, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return Status::OK(); } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_shared_state->sink_status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index c021687e1df..c53c057088c 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -43,6 +43,8 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { TUnit::BYTES, "Spill", 1); _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, "Spill", 1); + _spill_wait_in_queue_timer = + ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1); return Status::OK(); } @@ -82,13 +84,18 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); - auto spill_func = [this, state, &parent, execution_context] { + + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto spill_func = [this, state, &parent, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; return Status::OK(); } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_merge_sort_timer); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 20fa46a5bf9..45e42390bc5 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -455,6 +455,8 @@ public: ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", "Spill", 1); _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, "Spill", 1); + _spill_wait_in_queue_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1); _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); _spill_read_wait_io_timer = @@ -469,6 +471,7 @@ public: RuntimeProfile::Counter* _spill_read_bytes; RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr; }; class DataSinkOperatorXBase; @@ -776,6 +779,8 @@ public: TUnit::BYTES, "Spill", 1); _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, "Spill", 1); + _spill_wait_in_queue_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1); _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); _spill_read_wait_io_timer = @@ -789,6 +794,7 @@ public: RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; RuntimeProfile::Counter* _spill_data_size = nullptr; RuntimeProfile::Counter* _spill_block_count = nullptr; + RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr; RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org