This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new f3994437b6f [spill](counters) adjust spill counter level f3994437b6f is described below commit f3994437b6f047abf7384e46148e285d3d58c896 Author: jacktengg <18241664+jackte...@users.noreply.github.com> AuthorDate: Tue Sep 24 16:19:29 2024 +0800 [spill](counters) adjust spill counter level --- be/src/agent/workload_group_listener.cpp | 2 + be/src/pipeline/exec/operator.h | 48 +++++++++------------- .../exec/partitioned_aggregation_sink_operator.cpp | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 45 ++++++++++---------- .../exec/partitioned_hash_join_probe_operator.h | 1 - .../exec/partitioned_hash_join_sink_operator.cpp | 12 +++--- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 9 ++-- .../pipeline/exec/spill_sort_source_operator.cpp | 20 ++++----- be/src/pipeline/exec/spill_sort_source_operator.h | 1 - .../workload_group/workload_group_manager.cpp | 2 +- 10 files changed, 62 insertions(+), 80 deletions(-) diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 05e72c4038a..acf5f799c81 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -17,6 +17,8 @@ #include "agent/workload_group_listener.h" +#include <thrift/protocol/TDebugProtocol.h> + #include "runtime/workload_group/workload_group.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 17edfc668f5..3a644eb4f02 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -291,25 +291,21 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override { RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info)); - _spill_counters = ADD_LABEL_COUNTER_WITH_LEVEL(Base::profile(), "Spill", 1); - _spill_recover_time = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", "Spill", 1); - _spill_read_data_time = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", "Spill", 1); - _spill_deserialize_time = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", "Spill", 1); - _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", - TUnit::BYTES, "Spill", 1); + _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1); + _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1); + _spill_read_data_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", 1); + _spill_deserialize_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", 1); + _spill_read_bytes = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, 1); _spill_wait_in_queue_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1); _spill_write_wait_io_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); - _spill_read_wait_io_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1); + _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1); return Status::OK(); } - RuntimeProfile::Counter* _spill_counters = nullptr; + RuntimeProfile::Counter* _spill_timer = nullptr; RuntimeProfile::Counter* _spill_recover_time; RuntimeProfile::Counter* _spill_read_data_time; RuntimeProfile::Counter* _spill_deserialize_time; @@ -594,22 +590,19 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override { RETURN_IF_ERROR(Base::init(state, info)); - _spill_counters = ADD_LABEL_COUNTER_WITH_LEVEL(Base::profile(), "Spill", 1); - _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", "Spill", 1); + _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1); _spill_serialize_block_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1); - _spill_write_disk_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1); - _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", - TUnit::BYTES, "Spill", 1); - _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", - TUnit::UNIT, "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); + _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); + _spill_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); + _spill_block_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); _spill_wait_in_queue_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1); _spill_write_wait_io_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); - _spill_read_wait_io_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1); + _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1); return Status::OK(); } @@ -618,7 +611,6 @@ public: return dependencies; } - RuntimeProfile::Counter* _spill_counters = nullptr; RuntimeProfile::Counter* _spill_timer = nullptr; RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 17eeb8039df..39311a62dcc 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -105,7 +105,7 @@ void PartitionedAggSinkLocalState::_init_counters() { COUNTER_SET(_max_row_size_counter, (int64_t)0); _spill_serialize_hash_table_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeHashTableTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeHashTableTime", 1); } #define UPDATE_PROFILE(counter, name) \ do { \ diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index c7be1c61d14..f95cca1f6af 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -45,32 +45,28 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI "HashJoinProbeSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); - _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition"); - _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "Partition"); - _partition_shuffle_timer = ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "Partition"); - _spill_build_rows = ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, "Spill"); - _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1); - _recovery_build_rows = ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, "Spill"); - _recovery_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryBuildTime", "Spill", 1); - _spill_probe_rows = ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, "Spill"); - _recovery_probe_rows = ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, "Spill"); - _spill_build_blocks = ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, "Spill"); - _recovery_build_blocks = - ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, "Spill"); - _spill_probe_blocks = ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, "Spill"); - _spill_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", "Spill", 1); - _recovery_probe_blocks = - ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "Spill"); - _recovery_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryProbeTime", "Spill", 1); + _partition_timer = ADD_TIMER(profile(), "SpillPartitionTime"); + _partition_shuffle_timer = ADD_TIMER(profile(), "SpillPartitionShuffleTime"); + _spill_build_rows = ADD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT); + _spill_build_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", 1); + _recovery_build_rows = ADD_COUNTER(profile(), "SpillRecoveryBuildRows", TUnit::UNIT); + _recovery_build_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryBuildTime", 1); + _spill_probe_rows = ADD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT); + _recovery_probe_rows = ADD_COUNTER(profile(), "SpillRecoveryProbeRows", TUnit::UNIT); + _spill_build_blocks = ADD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT); + _recovery_build_blocks = ADD_COUNTER(profile(), "SpillRecoveryBuildBlocks", TUnit::UNIT); + _spill_probe_blocks = ADD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT); + _spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1); + _recovery_probe_blocks = ADD_COUNTER(profile(), "SpillRecoveryProbeBlocks", TUnit::UNIT); + _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryProbeTime", 1); _spill_serialize_block_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1); - _spill_write_disk_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1); - _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", - TUnit::BYTES, "Spill", 1); - _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", - TUnit::UNIT, "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); + _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); + _spill_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); + _spill_block_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); // Build phase _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); @@ -216,6 +212,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat }; auto exception_catch_func = [query_id, spill_func, this]() { + SCOPED_TIMER(_spill_timer); DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 7a4ba1ed50b..b611fb661af 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -92,7 +92,6 @@ private: std::shared_ptr<Dependency> _spill_dependency; - RuntimeProfile::Counter* _spill_and_partition_label = nullptr; RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; RuntimeProfile::Counter* _spill_build_rows = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index bb0efb17cb9..f2375632d2a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -47,12 +47,10 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); - _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", "Spill", 1); - _partition_shuffle_timer = - ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", "Spill", 1); - _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1); - _in_mem_rows_counter = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "InMemRow", TUnit::UNIT, "Spill", 1); + _partition_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionTime", 1); + _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionShuffleTime", 1); + _spill_build_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", 1); + _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", TUnit::UNIT, 1); return Status::OK(); } @@ -246,6 +244,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta }; auto exception_catch_func = [spill_func, this]() mutable { + SCOPED_TIMER(_spill_timer); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(spill_func()); return Status::OK(); @@ -319,6 +318,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), [this, state, query_id, spilling_stream, i, submit_timer] { + SCOPED_TIMER(_spill_timer); DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 5a37ac2f5ab..e55982bcb3b 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -55,14 +55,11 @@ void SpillSortSinkLocalState::_init_counters() { _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime"); _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); - _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + _sort_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 1); - _spill_merge_sort_timer = - ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1); + _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", 1); - _spill_wait_in_queue_timer = - ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1); + _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 1); } #define UPDATE_PROFILE(counter, name) \ do { \ diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 8e601a5c7a5..fe1356381b2 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -44,19 +44,15 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { state->get_task()->add_spill_dependency(_spill_dependency.get()); _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); - _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", "Spill", 1); - _spill_merge_sort_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", "Spill", 1); + _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", 1); _spill_serialize_block_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1); - _spill_write_disk_timer = - ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1); - _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", - TUnit::BYTES, "Spill", 1); - _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", - TUnit::UNIT, "Spill", 1); - _spill_wait_in_queue_timer = - ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1); + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); + _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); + _spill_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); + _spill_block_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); + _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 1); return Status::OK(); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index e372a9039bf..ca984e352fc 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -64,7 +64,6 @@ protected: std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // counters for spill merge sort - RuntimeProfile::Counter* _spill_timer = nullptr; RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr; RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 0dceaac7b3d..290d1fe1d5b 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -355,8 +355,8 @@ void WorkloadGroupMgr::handle_paused_queries() { if (queries_list.empty()) { LOG(INFO) << "wg: " << wg->debug_string() << " has no paused query, update it to memory sufficent"; - it = _paused_queries_list.erase(it); wg->update_memory_sufficent(true); + it = _paused_queries_list.erase(it); continue; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org