This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new cfb01d4bf07 [opt](spill) update counter from inner operators (#47704) cfb01d4bf07 is described below commit cfb01d4bf0743b7ef74f0421b78bd08cbe501fa3 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Tue Feb 11 21:53:46 2025 +0800 [opt](spill) update counter from inner operators (#47704) --- .../exec/partitioned_aggregation_sink_operator.cpp | 68 ++++------ .../exec/partitioned_aggregation_sink_operator.h | 15 +- .../partitioned_aggregation_source_operator.cpp | 85 +++++------- .../exec/partitioned_aggregation_source_operator.h | 20 +-- .../exec/partitioned_hash_join_probe_operator.cpp | 151 +++++++-------------- .../exec/partitioned_hash_join_probe_operator.h | 37 +---- .../exec/partitioned_hash_join_sink_operator.cpp | 26 ++++ .../exec/partitioned_hash_join_sink_operator.h | 1 + be/src/pipeline/exec/spill_sort_sink_operator.cpp | 22 +-- be/src/pipeline/exec/spill_sort_sink_operator.h | 3 - be/src/pipeline/exec/spill_utils.h | 27 ++++ 11 files changed, 177 insertions(+), 278 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index e38f8090227..090bb6845ca 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -17,6 +17,8 @@ #include "partitioned_aggregation_sink_operator.h" +#include <gen_cpp/Types_types.h> + #include <cstdint> #include <limits> #include <memory> @@ -89,55 +91,29 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat void PartitionedAggSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); - _hash_table_memory_usage = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); - _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); - - _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); - _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); - _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); - _expr_timer = ADD_TIMER(Base::profile(), "ExprTime"); - _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime"); - _deserialize_data_timer = ADD_TIMER(Base::profile(), "DeserializeAndMergeTime"); - _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); - _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); - _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); - _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); - _memory_usage_container = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageContainer", TUnit::BYTES, 1); - _memory_usage_arena = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageArena", TUnit::BYTES, 1); _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageReserved", TUnit::BYTES, 1); - COUNTER_SET(_max_row_size_counter, (int64_t)0); _spill_serialize_hash_table_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeHashTableTime", 1); } -#define UPDATE_PROFILE(counter, name) \ - do { \ - auto* child_counter = child_profile->get_counter(name); \ - if (child_counter != nullptr) { \ - COUNTER_SET(counter, child_counter->value()); \ - } \ - } while (false) +#define UPDATE_PROFILE(name) \ + update_profile_from_inner_profile<spilled>(name, _profile, child_profile) +template <bool spilled> void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable"); - UPDATE_PROFILE(_serialize_key_arena_memory_usage, "MemoryUsageSerializeKeyArena"); - UPDATE_PROFILE(_build_timer, "BuildTime"); - UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime"); - UPDATE_PROFILE(_merge_timer, "MergeTime"); - UPDATE_PROFILE(_expr_timer, "MergeTime"); - UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime"); - UPDATE_PROFILE(_deserialize_data_timer, "DeserializeAndMergeTime"); - UPDATE_PROFILE(_hash_table_compute_timer, "HashTableComputeTime"); - UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime"); - UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount"); - UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes"); - UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer"); - UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena"); + UPDATE_PROFILE("MemoryUsageHashTable"); + UPDATE_PROFILE("MemoryUsageSerializeKeyArena"); + UPDATE_PROFILE("BuildTime"); + UPDATE_PROFILE("SerializeKeyTime"); + UPDATE_PROFILE("MergeTime"); + UPDATE_PROFILE("SerializeDataTime"); + UPDATE_PROFILE("DeserializeAndMergeTime"); + UPDATE_PROFILE("HashTableComputeTime"); + UPDATE_PROFILE("HashTableEmplaceTime"); + UPDATE_PROFILE("HashTableInputCount"); + UPDATE_PROFILE("MemoryUsageContainer"); + UPDATE_PROFILE("MemoryUsageArena"); update_max_min_rows_counter(); } @@ -208,10 +184,12 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: return revoke_memory(state, nullptr); } } - if (local_state._runtime_state) { + + if (!local_state._shared_state->is_spilled) { auto* sink_local_state = local_state._runtime_state->get_sink_local_state(); - local_state.update_profile(sink_local_state->profile()); + local_state.update_profile<false>(sink_local_state->profile()); } + return Status::OK(); } Status PartitionedAggSinkOperatorX::revoke_memory( @@ -272,9 +250,13 @@ Status PartitionedAggSinkLocalState::revoke_memory( print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos, _shared_state->is_spilled, PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); + auto* sink_local_state = _runtime_state->get_sink_local_state(); if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; profile()->add_info_string("Spilled", "true"); + update_profile<false>(sink_local_state->profile()); + } else { + update_profile<true>(sink_local_state->profile()); } // TODO: spill thread may set_ready before the task::execute thread put the task to blocked state diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index b0315e31f5d..db1f2f86400 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -51,6 +51,7 @@ public: Status setup_in_memory_agg_op(RuntimeState* state); + template <bool spilled> void update_profile(RuntimeProfile* child_profile); template <typename KeyType> @@ -280,21 +281,7 @@ public: vectorized::Block value_block_; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; - RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; - RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; - RuntimeProfile::Counter* _hash_table_input_counter = nullptr; - RuntimeProfile::Counter* _build_timer = nullptr; - RuntimeProfile::Counter* _expr_timer = nullptr; - RuntimeProfile::Counter* _serialize_key_timer = nullptr; - RuntimeProfile::Counter* _merge_timer = nullptr; - RuntimeProfile::Counter* _serialize_data_timer = nullptr; - RuntimeProfile::Counter* _deserialize_data_timer = nullptr; - RuntimeProfile::Counter* _max_row_size_counter = nullptr; - RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; - RuntimeProfile::Counter* _memory_usage_container = nullptr; - RuntimeProfile::Counter* _memory_usage_arena = nullptr; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; - RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr; }; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index edee6a66a41..5a68c69a7e2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -43,7 +43,7 @@ Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo& info) RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - _init_counters(); + _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "AggSourceSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); @@ -62,49 +62,28 @@ Status PartitionedAggLocalState::open(RuntimeState* state) { return Status::OK(); } -void PartitionedAggLocalState::_init_counters() { - _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); - _get_results_timer = ADD_TIMER(profile(), "GetResultsTime"); - _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime"); - _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime"); - _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); - _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime"); - _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(profile(), "HashTableSize", TUnit::UNIT, 1); - - _merge_timer = ADD_TIMER(profile(), "MergeTime"); - _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime"); - _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime"); - _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime"); - _hash_table_input_counter = - ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", TUnit::UNIT, 1); - _hash_table_memory_usage = - ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); - - _memory_usage_container = - ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageContainer", TUnit::BYTES, 1); - _memory_usage_arena = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageArena", TUnit::BYTES, 1); -} - -#define UPDATE_PROFILE(counter, name) \ - do { \ - auto* child_counter = child_profile->get_counter(name); \ - if (child_counter != nullptr) { \ - COUNTER_SET(counter, child_counter->value()); \ - } \ - } while (false) +#define UPDATE_COUNTER_FROM_INNER(name) \ + update_profile_from_inner_profile<spilled>(name, _runtime_profile.get(), child_profile) +template <bool spilled> void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_get_results_timer, "GetResultsTime"); - UPDATE_PROFILE(_serialize_result_timer, "SerializeResultTime"); - UPDATE_PROFILE(_hash_table_iterate_timer, "HashTableIterateTime"); - UPDATE_PROFILE(_insert_keys_to_column_timer, "InsertKeysToColumnTime"); - UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime"); - UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize"); - UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage"); - UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer"); - UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena"); + UPDATE_COUNTER_FROM_INNER("GetResultsTime"); + UPDATE_COUNTER_FROM_INNER("HashTableIterateTime"); + UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime"); + UPDATE_COUNTER_FROM_INNER("InsertValuesToColumnTime"); + UPDATE_COUNTER_FROM_INNER("MergeTime"); + UPDATE_COUNTER_FROM_INNER("DeserializeAndMergeTime"); + UPDATE_COUNTER_FROM_INNER("HashTableComputeTime"); + UPDATE_COUNTER_FROM_INNER("HashTableEmplaceTime"); + UPDATE_COUNTER_FROM_INNER("HashTableInputCount"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable"); + UPDATE_COUNTER_FROM_INNER("HashTableSize"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageContainer"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageArena"); } +#undef UPDATE_COUNTER_FROM_INNER + Status PartitionedAggLocalState::close(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); @@ -188,20 +167,26 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: auto* runtime_state = local_state._runtime_state.get(); local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once(); status = _agg_source_operator->get_block(runtime_state, block, eos); - RETURN_IF_ERROR(status); - if (local_state._runtime_state) { + if (!local_state._shared_state->is_spilled) { auto* source_local_state = local_state._runtime_state->get_local_state(_agg_source_operator->operator_id()); - local_state.update_profile(source_local_state->profile()); + local_state.update_profile<false>(source_local_state->profile()); } + + RETURN_IF_ERROR(status); if (*eos) { - if (local_state._shared_state->is_spilled && - !local_state._shared_state->spill_partitions.empty()) { - local_state._current_partition_eos = false; - local_state._need_to_merge_data_for_current_partition = true; - status = local_state._shared_state->in_mem_shared_state->reset_hash_table(); - RETURN_IF_ERROR(status); - *eos = false; + if (local_state._shared_state->is_spilled) { + auto* source_local_state = local_state._runtime_state->get_local_state( + _agg_source_operator->operator_id()); + local_state.update_profile<true>(source_local_state->profile()); + + if (!local_state._shared_state->spill_partitions.empty()) { + local_state._current_partition_eos = false; + local_state._need_to_merge_data_for_current_partition = true; + status = local_state._shared_state->in_mem_shared_state->reset_hash_table(); + RETURN_IF_ERROR(status); + *eos = false; + } } } local_state.reached_limit(block, eos); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index c990e3d90ea..bc800f02fb5 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -45,11 +45,10 @@ public: Status recover_blocks_from_disk(RuntimeState* state, bool& has_data); Status setup_in_memory_agg_op(RuntimeState* state); + template <bool spilled> void update_profile(RuntimeProfile* child_profile); protected: - void _init_counters(); - friend class PartitionedAggSourceOperatorX; std::unique_ptr<RuntimeState> _runtime_state; @@ -63,23 +62,8 @@ protected: std::vector<vectorized::Block> _blocks; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; - RuntimeProfile::Counter* _get_results_timer = nullptr; - RuntimeProfile::Counter* _serialize_result_timer = nullptr; - RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; - RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; - RuntimeProfile::Counter* _serialize_data_timer = nullptr; - RuntimeProfile::Counter* _hash_table_size_counter = nullptr; - - RuntimeProfile::Counter* _merge_timer = nullptr; - RuntimeProfile::Counter* _deserialize_data_timer = nullptr; - RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; - RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; - RuntimeProfile::Counter* _hash_table_input_counter = nullptr; - RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; - RuntimeProfile::Counter* _memory_usage_container = nullptr; - RuntimeProfile::Counter* _memory_usage_arena = nullptr; - RuntimeProfile::Counter* _memory_usage_reserved = nullptr; }; + class AggSourceOperatorX; class PartitionedAggSourceOperatorX : public OperatorX<PartitionedAggLocalState> { public: diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index adb686ffa21..b33a5fb9ca8 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -74,89 +74,65 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryProbeTime", 1); _get_child_next_timer = ADD_TIMER_WITH_LEVEL(profile(), "GetChildNextTime", 1); + _probe_blocks_bytes = + ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBloksBytesInMem", TUnit::BYTES, 1); _memory_usage_reserved = - ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::UNIT, 1); - - _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBlocksBytes", TUnit::BYTES, 1); - - // Build phase - _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); - _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", TUnit::UNIT, "BuildPhase"); - _publish_runtime_filter_timer = - ADD_CHILD_TIMER(profile(), "PublishRuntimeFilterTime", "BuildPhase"); - _runtime_filter_compute_timer = - ADD_CHILD_TIMER(profile(), "RuntimeFilterComputeTime", "BuildPhase"); - _build_table_timer = ADD_CHILD_TIMER(profile(), "BuildTableTime", "BuildPhase"); - _build_side_merge_block_timer = - ADD_CHILD_TIMER(profile(), "BuildSideMergeBlockTime", "BuildPhase"); - _build_table_insert_timer = ADD_CHILD_TIMER(profile(), "BuildTableInsertTime", "BuildPhase"); - _build_expr_call_timer = ADD_CHILD_TIMER(profile(), "BuildExprCallTime", "BuildPhase"); - _build_side_compute_hash_timer = - ADD_CHILD_TIMER(profile(), "BuildSideHashComputingTime", "BuildPhase"); - - _hash_table_memory_usage = - ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); - - _allocate_resource_timer = ADD_CHILD_TIMER(profile(), "AllocateResourceTime", "BuildPhase"); - - // Probe phase - _probe_phase_label = ADD_LABEL_COUNTER(profile(), "ProbePhase"); - _probe_next_timer = ADD_CHILD_TIMER(profile(), "ProbeFindNextTime", "ProbePhase"); - _probe_expr_call_timer = ADD_CHILD_TIMER(profile(), "ProbeExprCallTime", "ProbePhase"); - _search_hashtable_timer = - ADD_CHILD_TIMER(profile(), "ProbeWhenSearchHashTableTime", "ProbePhase"); - _build_side_output_timer = - ADD_CHILD_TIMER(profile(), "ProbeWhenBuildSideOutputTime", "ProbePhase"); - _probe_side_output_timer = - ADD_CHILD_TIMER(profile(), "ProbeWhenProbeSideOutputTime", "ProbePhase"); - _probe_process_hashtable_timer = - ADD_CHILD_TIMER(profile(), "ProbeWhenProcessHashTableTime", "ProbePhase"); - _process_other_join_conjunct_timer = - ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase"); - _init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime", "ProbePhase"); - _probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase"); - _join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer", "ProbePhase"); - _build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock", "ProbePhase"); - _probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows", TUnit::UNIT, "ProbePhase"); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); return Status::OK(); } -#define UPDATE_PROFILE(counter, name) \ - do { \ - auto* child_counter = child_profile->get_counter(name); \ - if (child_counter != nullptr) { \ - COUNTER_UPDATE(counter, child_counter->value()); \ - } \ - } while (false) +#define UPDATE_COUNTER_FROM_INNER(name) \ + update_profile_from_inner_profile<spilled>(name, _runtime_profile.get(), child_profile) + +template <bool spilled> void PartitionedHashJoinProbeLocalState::update_build_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_build_rows_counter, "BuildRows"); - UPDATE_PROFILE(_publish_runtime_filter_timer, "PublishRuntimeFilterTime"); - UPDATE_PROFILE(_runtime_filter_compute_timer, "RuntimeFilterComputeTime"); - UPDATE_PROFILE(_build_table_timer, "BuildTableTime"); - UPDATE_PROFILE(_build_side_merge_block_timer, "BuildSideMergeBlockTime"); - UPDATE_PROFILE(_build_table_insert_timer, "BuildTableInsertTime"); - UPDATE_PROFILE(_build_expr_call_timer, "BuildExprCallTime"); - UPDATE_PROFILE(_build_side_compute_hash_timer, "BuildSideHashComputingTime"); - UPDATE_PROFILE(_allocate_resource_timer, "AllocateResourceTime"); + UPDATE_COUNTER_FROM_INNER("PublishRuntimeFilterTime"); + UPDATE_COUNTER_FROM_INNER("BuildRuntimeFilterTime"); + UPDATE_COUNTER_FROM_INNER("BuildHashTableTime"); + UPDATE_COUNTER_FROM_INNER("MergeBuildBlockTime"); + UPDATE_COUNTER_FROM_INNER("BuildTableInsertTime"); + UPDATE_COUNTER_FROM_INNER("BuildExprCallTime"); + UPDATE_COUNTER_FROM_INNER("RuntimeFilterInitTime"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildBlocks"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildKeyArena"); } +template <bool spilled> void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_probe_timer, "ProbeTime"); - UPDATE_PROFILE(_join_filter_timer, "JoinFilterTimer"); - UPDATE_PROFILE(_build_output_block_timer, "BuildOutputBlock"); - UPDATE_PROFILE(_probe_rows_counter, "ProbeRows"); - UPDATE_PROFILE(_probe_next_timer, "ProbeFindNextTime"); - UPDATE_PROFILE(_probe_expr_call_timer, "ProbeExprCallTime"); - UPDATE_PROFILE(_search_hashtable_timer, "ProbeWhenSearchHashTableTime"); - UPDATE_PROFILE(_build_side_output_timer, "ProbeWhenBuildSideOutputTime"); - UPDATE_PROFILE(_probe_side_output_timer, "ProbeWhenProbeSideOutputTime"); - UPDATE_PROFILE(_probe_process_hashtable_timer, "ProbeWhenProcessHashTableTime"); - UPDATE_PROFILE(_process_other_join_conjunct_timer, "OtherJoinConjunctTime"); - UPDATE_PROFILE(_init_probe_side_timer, "InitProbeSideTime"); + UPDATE_COUNTER_FROM_INNER("ProbeTime"); + UPDATE_COUNTER_FROM_INNER("JoinFilterTimer"); + UPDATE_COUNTER_FROM_INNER("BuildOutputBlock"); + UPDATE_COUNTER_FROM_INNER("ProbeRows"); + UPDATE_COUNTER_FROM_INNER("ProbeFindNextTime"); + UPDATE_COUNTER_FROM_INNER("ProbeExprCallTime"); + UPDATE_COUNTER_FROM_INNER("ProbeWhenSearchHashTableTime"); + UPDATE_COUNTER_FROM_INNER("ProbeWhenBuildSideOutputTime"); + UPDATE_COUNTER_FROM_INNER("ProbeWhenProbeSideOutputTime"); + UPDATE_COUNTER_FROM_INNER("ProbeWhenProcessHashTableTime"); + UPDATE_COUNTER_FROM_INNER("OtherJoinConjunctTime"); + UPDATE_COUNTER_FROM_INNER("InitProbeSideTime"); } #undef UPDATE_PROFILE +void PartitionedHashJoinProbeLocalState::update_profile_from_inner() { + auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>(); + if (_shared_state->inner_runtime_state) { + auto* sink_local_state = _shared_state->inner_runtime_state->get_sink_local_state(); + auto* probe_local_state = _shared_state->inner_runtime_state->get_local_state( + p._inner_probe_operator->operator_id()); + + if (_shared_state->need_to_spill) { + update_build_profile<true>(sink_local_state->profile()); + update_probe_profile<true>(probe_local_state->profile()); + } else { + update_build_profile<false>(sink_local_state->profile()); + update_probe_profile<false>(probe_local_state->profile()); + } + } +} + Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(PipelineXSpillLocalState::open(state)); return _parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state, @@ -607,21 +583,11 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill DCHECK(local_state._shared_state->inner_runtime_state); local_state._in_mem_shared_state_sptr = std::move(local_state._shared_state->inner_shared_state); - - auto* sink_state = local_state._shared_state->inner_runtime_state->get_sink_local_state(); - if (sink_state != nullptr) { - COUNTER_SET(local_state._hash_table_memory_usage, - sink_state->profile()->get_counter("MemoryUsageHashTable")->value()); - } return Status::OK(); } Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) const { - if (local_state._shared_state->inner_runtime_state) { - _update_profile_from_internal_states(local_state); - } - local_state._shared_state->inner_runtime_state = RuntimeState::create_unique( state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); @@ -682,9 +648,6 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( block.rows(), _inner_sink_operator->get_memory_usage( local_state._shared_state->inner_runtime_state.get())); - - COUNTER_SET(local_state._hash_table_memory_usage, - sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value()); return Status::OK(); } @@ -762,6 +725,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, print_id(state->query_id()), node_id(), state->task_id(), local_state._partition_cursor); local_state._partition_cursor++; + local_state.update_profile_from_inner(); if (local_state._partition_cursor == _partition_count) { *eos = true; } else { @@ -876,18 +840,6 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat return false; } -void PartitionedHashJoinProbeOperatorX::_update_profile_from_internal_states( - PartitionedHashJoinProbeLocalState& local_state) const { - if (local_state._shared_state->inner_runtime_state) { - auto* sink_local_state = - local_state._shared_state->inner_runtime_state->get_sink_local_state(); - local_state.update_build_profile(sink_local_state->profile()); - auto* probe_local_state = local_state._shared_state->inner_runtime_state->get_local_state( - _inner_probe_operator->operator_id()); - local_state.update_probe_profile(probe_local_state->profile()); - } -} - Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { *eos = false; @@ -946,16 +898,11 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori } else { RETURN_IF_ERROR(_inner_probe_operator->pull( local_state._shared_state->inner_runtime_state.get(), block, eos)); - if (*eos) { - _update_profile_from_internal_states(local_state); - } + local_state.update_profile_from_inner(); } local_state.add_num_rows_returned(block->rows()); COUNTER_UPDATE(local_state._blocks_returned_counter, 1); - if (*eos) { - _update_profile_from_internal_states(local_state); - } } return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 226116ef7f4..cceac79b357 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -57,11 +57,16 @@ public: Status finish_spilling(uint32_t partition_index); + template <bool spilled> void update_build_profile(RuntimeProfile* child_profile); + + template <bool spilled> void update_probe_profile(RuntimeProfile* child_profile); std::string debug_string(int indentation_level = 0) const override; + void update_profile_from_inner(); + friend class PartitionedHashJoinProbeOperatorX; private: @@ -102,37 +107,8 @@ private: RuntimeProfile::Counter* _recovery_probe_blocks = nullptr; RuntimeProfile::Counter* _recovery_probe_timer = nullptr; - RuntimeProfile::Counter* _build_phase_label = nullptr; - RuntimeProfile::Counter* _build_rows_counter = nullptr; - RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; - RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; - - RuntimeProfile::Counter* _build_table_timer = nullptr; - RuntimeProfile::Counter* _build_expr_call_timer = nullptr; - RuntimeProfile::Counter* _build_table_insert_timer = nullptr; - RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr; - RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr; - - RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _probe_blocks_bytes = nullptr; - - RuntimeProfile::Counter* _allocate_resource_timer = nullptr; - - RuntimeProfile::Counter* _probe_phase_label = nullptr; - RuntimeProfile::Counter* _probe_expr_call_timer = nullptr; - RuntimeProfile::Counter* _probe_next_timer = nullptr; - RuntimeProfile::Counter* _probe_side_output_timer = nullptr; - RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr; - RuntimeProfile::Counter* _search_hashtable_timer = nullptr; - RuntimeProfile::Counter* _init_probe_side_timer = nullptr; - RuntimeProfile::Counter* _build_side_output_timer = nullptr; - RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr; - RuntimeProfile::Counter* _probe_timer = nullptr; - RuntimeProfile::Counter* _probe_rows_counter = nullptr; - RuntimeProfile::Counter* _join_filter_timer = nullptr; - RuntimeProfile::Counter* _build_output_block_timer = nullptr; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; - RuntimeProfile::Counter* _get_child_next_timer = nullptr; }; @@ -195,9 +171,6 @@ private: bool _should_revoke_memory(RuntimeState* state) const; - void _update_profile_from_internal_states( - PartitionedHashJoinProbeLocalState& local_state) const; - const TJoinDistributionType::type _join_distribution; std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 2df474f42eb..3c94cc82119 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -553,6 +553,29 @@ Status PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* return Status::OK(); } +#define UPDATE_COUNTER_FROM_INNER(name) \ + update_profile_from_inner_profile<false>(name, _profile, inner_profile) + +void PartitionedHashJoinSinkLocalState::update_profile_from_inner() { + auto* sink_local_state = _shared_state->inner_runtime_state->get_sink_local_state(); + if (sink_local_state) { + auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(sink_local_state); + auto* inner_profile = inner_sink_state->profile(); + UPDATE_COUNTER_FROM_INNER("PublishRuntimeFilterTime"); + UPDATE_COUNTER_FROM_INNER("BuildRuntimeFilterTime"); + UPDATE_COUNTER_FROM_INNER("BuildHashTableTime"); + UPDATE_COUNTER_FROM_INNER("MergeBuildBlockTime"); + UPDATE_COUNTER_FROM_INNER("BuildTableInsertTime"); + UPDATE_COUNTER_FROM_INNER("BuildExprCallTime"); + UPDATE_COUNTER_FROM_INNER("RuntimeFilterInitTime"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildBlocks"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable"); + UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildKeyArena"); + } +} + +#undef UPDATE_COUNTER_FROM_INNER + // After building hash table it will not be able to spill later // even if memory is low, and will cause cancel of queries. // So make a check here, if build blocks mem usage is too high, @@ -613,6 +636,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); + local_state.update_profile_from_inner(); + LOG(INFO) << fmt::format( "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill " "memory usage:{}", @@ -660,6 +685,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); local_state.update_memory_usage(); + local_state.update_profile_from_inner(); if (eos) { LOG(INFO) << fmt::format( "Query:{}, hash join sink:{}, task:{}, eos, set_ready_to_read, nonspill memory " diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 492c98fa637..50c3a59f214 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -51,6 +51,7 @@ public: size_t revocable_mem_size(RuntimeState* state) const; [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); void update_memory_usage(); + void update_profile_from_inner(); Dependency* finishdependency() override; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 759b88e28bc..3aec42347ff 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -55,26 +55,17 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) { void SpillSortSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); - - _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime"); - _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); - _sort_blocks_memory_usage = - ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", 1); } -#define UPDATE_PROFILE(counter, name) \ - do { \ - auto* child_counter = child_profile->get_counter(name); \ - if (child_counter != nullptr) { \ - COUNTER_SET(counter, child_counter->value()); \ - } \ - } while (false) + +#define UPDATE_PROFILE(name) update_profile_from_inner_profile<true>(name, _profile, child_profile) void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime"); - UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime"); - UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks"); + UPDATE_PROFILE("PartialSortTime"); + UPDATE_PROFILE("MergeBlockTime"); + UPDATE_PROFILE("MemoryUsageSortBlocks"); } +#undef UPDATE_PROFILE Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { return Base::close(state, execsink_status); @@ -161,7 +152,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(), in_block, false)); int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size(); - COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); COUNTER_SET(local_state._memory_used_counter, data_size); if (eos) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 3d6ccdcc4ce..e41a3083ac6 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -51,9 +51,6 @@ private: std::unique_ptr<RuntimeState> _runtime_state; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; - RuntimeProfile::Counter* _partial_sort_timer = nullptr; - RuntimeProfile::Counter* _merge_block_timer = nullptr; - RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index a3c51faca1f..ff40c4656dc 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/Metrics_types.h> #include <gen_cpp/Types_types.h> #include <glog/logging.h> @@ -263,5 +264,31 @@ private: RuntimeProfile::Counter* _reading_task_count = nullptr; }; +template <bool accumulating> +inline void update_profile_from_inner_profile(const std::string& name, + RuntimeProfile* runtime_profile, + RuntimeProfile* inner_profile) { + auto* inner_counter = inner_profile->get_counter(name); + DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found"; + if (inner_counter == nullptr) [[unlikely]] { + return; + } + auto* counter = runtime_profile->get_counter(name); + if (counter == nullptr) [[unlikely]] { + counter = runtime_profile->add_counter(name, inner_counter->type(), "", + inner_counter->level()); + } + if constexpr (accumulating) { + // Memory usage should not be accumulated. + if (counter->type() == TUnit::BYTES) { + counter->set(inner_counter->value()); + } else { + counter->update(inner_counter->value()); + } + } else { + counter->set(inner_counter->value()); + } +} + #include "common/compile_check_end.h" } // namespace doris::pipeline \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org