This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new cfb01d4bf07 [opt](spill) update counter from inner operators (#47704)
cfb01d4bf07 is described below

commit cfb01d4bf0743b7ef74f0421b78bd08cbe501fa3
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Tue Feb 11 21:53:46 2025 +0800

    [opt](spill) update counter from inner operators (#47704)
---
 .../exec/partitioned_aggregation_sink_operator.cpp |  68 ++++------
 .../exec/partitioned_aggregation_sink_operator.h   |  15 +-
 .../partitioned_aggregation_source_operator.cpp    |  85 +++++-------
 .../exec/partitioned_aggregation_source_operator.h |  20 +--
 .../exec/partitioned_hash_join_probe_operator.cpp  | 151 +++++++--------------
 .../exec/partitioned_hash_join_probe_operator.h    |  37 +----
 .../exec/partitioned_hash_join_sink_operator.cpp   |  26 ++++
 .../exec/partitioned_hash_join_sink_operator.h     |   1 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  22 +--
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   3 -
 be/src/pipeline/exec/spill_utils.h                 |  27 ++++
 11 files changed, 177 insertions(+), 278 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index e38f8090227..090bb6845ca 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "partitioned_aggregation_sink_operator.h"
 
+#include <gen_cpp/Types_types.h>
+
 #include <cstdint>
 #include <limits>
 #include <memory>
@@ -89,55 +91,29 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
 void PartitionedAggSinkLocalState::_init_counters() {
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
 
-    _hash_table_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
-    _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
-            "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);
-
-    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
-    _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
-    _merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
-    _expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
-    _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
-    _deserialize_data_timer = ADD_TIMER(Base::profile(), 
"DeserializeAndMergeTime");
-    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
-    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
-    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
-    _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
-    _memory_usage_container =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageContainer", 
TUnit::BYTES, 1);
-    _memory_usage_arena =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageArena", 
TUnit::BYTES, 1);
     _memory_usage_reserved =
             ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
-    COUNTER_SET(_max_row_size_counter, (int64_t)0);
 
     _spill_serialize_hash_table_timer =
             ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillSerializeHashTableTime", 1);
 }
-#define UPDATE_PROFILE(counter, name)                           \
-    do {                                                        \
-        auto* child_counter = child_profile->get_counter(name); \
-        if (child_counter != nullptr) {                         \
-            COUNTER_SET(counter, child_counter->value());       \
-        }                                                       \
-    } while (false)
+#define UPDATE_PROFILE(name) \
+    update_profile_from_inner_profile<spilled>(name, _profile, child_profile)
 
+template <bool spilled>
 void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
-    UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable");
-    UPDATE_PROFILE(_serialize_key_arena_memory_usage, 
"MemoryUsageSerializeKeyArena");
-    UPDATE_PROFILE(_build_timer, "BuildTime");
-    UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime");
-    UPDATE_PROFILE(_merge_timer, "MergeTime");
-    UPDATE_PROFILE(_expr_timer, "MergeTime");
-    UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime");
-    UPDATE_PROFILE(_deserialize_data_timer, "DeserializeAndMergeTime");
-    UPDATE_PROFILE(_hash_table_compute_timer, "HashTableComputeTime");
-    UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
-    UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
-    UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
-    UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer");
-    UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena");
+    UPDATE_PROFILE("MemoryUsageHashTable");
+    UPDATE_PROFILE("MemoryUsageSerializeKeyArena");
+    UPDATE_PROFILE("BuildTime");
+    UPDATE_PROFILE("SerializeKeyTime");
+    UPDATE_PROFILE("MergeTime");
+    UPDATE_PROFILE("SerializeDataTime");
+    UPDATE_PROFILE("DeserializeAndMergeTime");
+    UPDATE_PROFILE("HashTableComputeTime");
+    UPDATE_PROFILE("HashTableEmplaceTime");
+    UPDATE_PROFILE("HashTableInputCount");
+    UPDATE_PROFILE("MemoryUsageContainer");
+    UPDATE_PROFILE("MemoryUsageArena");
 
     update_max_min_rows_counter();
 }
@@ -208,10 +184,12 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
             return revoke_memory(state, nullptr);
         }
     }
-    if (local_state._runtime_state) {
+
+    if (!local_state._shared_state->is_spilled) {
         auto* sink_local_state = 
local_state._runtime_state->get_sink_local_state();
-        local_state.update_profile(sink_local_state->profile());
+        local_state.update_profile<false>(sink_local_state->profile());
     }
+
     return Status::OK();
 }
 Status PartitionedAggSinkOperatorX::revoke_memory(
@@ -272,9 +250,13 @@ Status PartitionedAggSinkLocalState::revoke_memory(
             print_id(state->query_id()), _parent->node_id(), state->task_id(), 
_eos,
             _shared_state->is_spilled,
             PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
+    auto* sink_local_state = _runtime_state->get_sink_local_state();
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
         profile()->add_info_string("Spilled", "true");
+        update_profile<false>(sink_local_state->profile());
+    } else {
+        update_profile<true>(sink_local_state->profile());
     }
 
     // TODO: spill thread may set_ready before the task::execute thread put 
the task to blocked state
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index b0315e31f5d..db1f2f86400 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -51,6 +51,7 @@ public:
 
     Status setup_in_memory_agg_op(RuntimeState* state);
 
+    template <bool spilled>
     void update_profile(RuntimeProfile* child_profile);
 
     template <typename KeyType>
@@ -280,21 +281,7 @@ public:
     vectorized::Block value_block_;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
-    RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
-    RuntimeProfile::Counter* _build_timer = nullptr;
-    RuntimeProfile::Counter* _expr_timer = nullptr;
-    RuntimeProfile::Counter* _serialize_key_timer = nullptr;
-    RuntimeProfile::Counter* _merge_timer = nullptr;
-    RuntimeProfile::Counter* _serialize_data_timer = nullptr;
-    RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
-    RuntimeProfile::Counter* _max_row_size_counter = nullptr;
-    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
-    RuntimeProfile::Counter* _memory_usage_container = nullptr;
-    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
     RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
     RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr;
 };
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index edee6a66a41..5a68c69a7e2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -43,7 +43,7 @@ Status PartitionedAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
-    _init_counters();
+    _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
     _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                   "AggSourceSpillDependency", 
true);
     state->get_task()->add_spill_dependency(_spill_dependency.get());
@@ -62,49 +62,28 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
     return Status::OK();
 }
 
-void PartitionedAggLocalState::_init_counters() {
-    _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
-    _get_results_timer = ADD_TIMER(profile(), "GetResultsTime");
-    _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");
-    _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
-    _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
-    _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime");
-    _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(profile(), 
"HashTableSize", TUnit::UNIT, 1);
-
-    _merge_timer = ADD_TIMER(profile(), "MergeTime");
-    _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime");
-    _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime");
-    _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime");
-    _hash_table_input_counter =
-            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", 
TUnit::UNIT, 1);
-    _hash_table_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
-
-    _memory_usage_container =
-            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageContainer", 
TUnit::BYTES, 1);
-    _memory_usage_arena = ADD_COUNTER_WITH_LEVEL(profile(), 
"MemoryUsageArena", TUnit::BYTES, 1);
-}
-
-#define UPDATE_PROFILE(counter, name)                           \
-    do {                                                        \
-        auto* child_counter = child_profile->get_counter(name); \
-        if (child_counter != nullptr) {                         \
-            COUNTER_SET(counter, child_counter->value());       \
-        }                                                       \
-    } while (false)
+#define UPDATE_COUNTER_FROM_INNER(name) \
+    update_profile_from_inner_profile<spilled>(name, _runtime_profile.get(), 
child_profile)
 
+template <bool spilled>
 void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) {
-    UPDATE_PROFILE(_get_results_timer, "GetResultsTime");
-    UPDATE_PROFILE(_serialize_result_timer, "SerializeResultTime");
-    UPDATE_PROFILE(_hash_table_iterate_timer, "HashTableIterateTime");
-    UPDATE_PROFILE(_insert_keys_to_column_timer, "InsertKeysToColumnTime");
-    UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime");
-    UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize");
-    UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage");
-    UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer");
-    UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena");
+    UPDATE_COUNTER_FROM_INNER("GetResultsTime");
+    UPDATE_COUNTER_FROM_INNER("HashTableIterateTime");
+    UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime");
+    UPDATE_COUNTER_FROM_INNER("InsertValuesToColumnTime");
+    UPDATE_COUNTER_FROM_INNER("MergeTime");
+    UPDATE_COUNTER_FROM_INNER("DeserializeAndMergeTime");
+    UPDATE_COUNTER_FROM_INNER("HashTableComputeTime");
+    UPDATE_COUNTER_FROM_INNER("HashTableEmplaceTime");
+    UPDATE_COUNTER_FROM_INNER("HashTableInputCount");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable");
+    UPDATE_COUNTER_FROM_INNER("HashTableSize");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageContainer");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageArena");
 }
 
+#undef UPDATE_COUNTER_FROM_INNER
+
 Status PartitionedAggLocalState::close(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_close_timer);
@@ -188,20 +167,26 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     auto* runtime_state = local_state._runtime_state.get();
     
local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once();
     status = _agg_source_operator->get_block(runtime_state, block, eos);
-    RETURN_IF_ERROR(status);
-    if (local_state._runtime_state) {
+    if (!local_state._shared_state->is_spilled) {
         auto* source_local_state =
                 
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
-        local_state.update_profile(source_local_state->profile());
+        local_state.update_profile<false>(source_local_state->profile());
     }
+
+    RETURN_IF_ERROR(status);
     if (*eos) {
-        if (local_state._shared_state->is_spilled &&
-            !local_state._shared_state->spill_partitions.empty()) {
-            local_state._current_partition_eos = false;
-            local_state._need_to_merge_data_for_current_partition = true;
-            status = 
local_state._shared_state->in_mem_shared_state->reset_hash_table();
-            RETURN_IF_ERROR(status);
-            *eos = false;
+        if (local_state._shared_state->is_spilled) {
+            auto* source_local_state = 
local_state._runtime_state->get_local_state(
+                    _agg_source_operator->operator_id());
+            local_state.update_profile<true>(source_local_state->profile());
+
+            if (!local_state._shared_state->spill_partitions.empty()) {
+                local_state._current_partition_eos = false;
+                local_state._need_to_merge_data_for_current_partition = true;
+                status = 
local_state._shared_state->in_mem_shared_state->reset_hash_table();
+                RETURN_IF_ERROR(status);
+                *eos = false;
+            }
         }
     }
     local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index c990e3d90ea..bc800f02fb5 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -45,11 +45,10 @@ public:
     Status recover_blocks_from_disk(RuntimeState* state, bool& has_data);
     Status setup_in_memory_agg_op(RuntimeState* state);
 
+    template <bool spilled>
     void update_profile(RuntimeProfile* child_profile);
 
 protected:
-    void _init_counters();
-
     friend class PartitionedAggSourceOperatorX;
     std::unique_ptr<RuntimeState> _runtime_state;
 
@@ -63,23 +62,8 @@ protected:
     std::vector<vectorized::Block> _blocks;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
-    RuntimeProfile::Counter* _get_results_timer = nullptr;
-    RuntimeProfile::Counter* _serialize_result_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr;
-    RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
-    RuntimeProfile::Counter* _serialize_data_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
-
-    RuntimeProfile::Counter* _merge_timer = nullptr;
-    RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
-    RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
-    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
-    RuntimeProfile::Counter* _memory_usage_container = nullptr;
-    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
-    RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
 };
+
 class AggSourceOperatorX;
 class PartitionedAggSourceOperatorX : public 
OperatorX<PartitionedAggLocalState> {
 public:
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index adb686ffa21..b33a5fb9ca8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -74,89 +74,65 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillRecoveryProbeTime", 1);
     _get_child_next_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"GetChildNextTime", 1);
 
+    _probe_blocks_bytes =
+            ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBloksBytesInMem", 
TUnit::BYTES, 1);
     _memory_usage_reserved =
-            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::UNIT, 1);
-
-    _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), 
"ProbeBlocksBytes", TUnit::BYTES, 1);
-
-    // Build phase
-    _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
-    _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", 
TUnit::UNIT, "BuildPhase");
-    _publish_runtime_filter_timer =
-            ADD_CHILD_TIMER(profile(), "PublishRuntimeFilterTime", 
"BuildPhase");
-    _runtime_filter_compute_timer =
-            ADD_CHILD_TIMER(profile(), "RuntimeFilterComputeTime", 
"BuildPhase");
-    _build_table_timer = ADD_CHILD_TIMER(profile(), "BuildTableTime", 
"BuildPhase");
-    _build_side_merge_block_timer =
-            ADD_CHILD_TIMER(profile(), "BuildSideMergeBlockTime", 
"BuildPhase");
-    _build_table_insert_timer = ADD_CHILD_TIMER(profile(), 
"BuildTableInsertTime", "BuildPhase");
-    _build_expr_call_timer = ADD_CHILD_TIMER(profile(), "BuildExprCallTime", 
"BuildPhase");
-    _build_side_compute_hash_timer =
-            ADD_CHILD_TIMER(profile(), "BuildSideHashComputingTime", 
"BuildPhase");
-
-    _hash_table_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", 
TUnit::BYTES, 1);
-
-    _allocate_resource_timer = ADD_CHILD_TIMER(profile(), 
"AllocateResourceTime", "BuildPhase");
-
-    // Probe phase
-    _probe_phase_label = ADD_LABEL_COUNTER(profile(), "ProbePhase");
-    _probe_next_timer = ADD_CHILD_TIMER(profile(), "ProbeFindNextTime", 
"ProbePhase");
-    _probe_expr_call_timer = ADD_CHILD_TIMER(profile(), "ProbeExprCallTime", 
"ProbePhase");
-    _search_hashtable_timer =
-            ADD_CHILD_TIMER(profile(), "ProbeWhenSearchHashTableTime", 
"ProbePhase");
-    _build_side_output_timer =
-            ADD_CHILD_TIMER(profile(), "ProbeWhenBuildSideOutputTime", 
"ProbePhase");
-    _probe_side_output_timer =
-            ADD_CHILD_TIMER(profile(), "ProbeWhenProbeSideOutputTime", 
"ProbePhase");
-    _probe_process_hashtable_timer =
-            ADD_CHILD_TIMER(profile(), "ProbeWhenProcessHashTableTime", 
"ProbePhase");
-    _process_other_join_conjunct_timer =
-            ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase");
-    _init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime", 
"ProbePhase");
-    _probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase");
-    _join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer", 
"ProbePhase");
-    _build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock", 
"ProbePhase");
-    _probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows", 
TUnit::UNIT, "ProbePhase");
+            ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
     return Status::OK();
 }
-#define UPDATE_PROFILE(counter, name)                           \
-    do {                                                        \
-        auto* child_counter = child_profile->get_counter(name); \
-        if (child_counter != nullptr) {                         \
-            COUNTER_UPDATE(counter, child_counter->value());    \
-        }                                                       \
-    } while (false)
 
+#define UPDATE_COUNTER_FROM_INNER(name) \
+    update_profile_from_inner_profile<spilled>(name, _runtime_profile.get(), 
child_profile)
+
+template <bool spilled>
 void PartitionedHashJoinProbeLocalState::update_build_profile(RuntimeProfile* 
child_profile) {
-    UPDATE_PROFILE(_build_rows_counter, "BuildRows");
-    UPDATE_PROFILE(_publish_runtime_filter_timer, "PublishRuntimeFilterTime");
-    UPDATE_PROFILE(_runtime_filter_compute_timer, "RuntimeFilterComputeTime");
-    UPDATE_PROFILE(_build_table_timer, "BuildTableTime");
-    UPDATE_PROFILE(_build_side_merge_block_timer, "BuildSideMergeBlockTime");
-    UPDATE_PROFILE(_build_table_insert_timer, "BuildTableInsertTime");
-    UPDATE_PROFILE(_build_expr_call_timer, "BuildExprCallTime");
-    UPDATE_PROFILE(_build_side_compute_hash_timer, 
"BuildSideHashComputingTime");
-    UPDATE_PROFILE(_allocate_resource_timer, "AllocateResourceTime");
+    UPDATE_COUNTER_FROM_INNER("PublishRuntimeFilterTime");
+    UPDATE_COUNTER_FROM_INNER("BuildRuntimeFilterTime");
+    UPDATE_COUNTER_FROM_INNER("BuildHashTableTime");
+    UPDATE_COUNTER_FROM_INNER("MergeBuildBlockTime");
+    UPDATE_COUNTER_FROM_INNER("BuildTableInsertTime");
+    UPDATE_COUNTER_FROM_INNER("BuildExprCallTime");
+    UPDATE_COUNTER_FROM_INNER("RuntimeFilterInitTime");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildBlocks");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable");
+    UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildKeyArena");
 }
 
+template <bool spilled>
 void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* 
child_profile) {
-    UPDATE_PROFILE(_probe_timer, "ProbeTime");
-    UPDATE_PROFILE(_join_filter_timer, "JoinFilterTimer");
-    UPDATE_PROFILE(_build_output_block_timer, "BuildOutputBlock");
-    UPDATE_PROFILE(_probe_rows_counter, "ProbeRows");
-    UPDATE_PROFILE(_probe_next_timer, "ProbeFindNextTime");
-    UPDATE_PROFILE(_probe_expr_call_timer, "ProbeExprCallTime");
-    UPDATE_PROFILE(_search_hashtable_timer, "ProbeWhenSearchHashTableTime");
-    UPDATE_PROFILE(_build_side_output_timer, "ProbeWhenBuildSideOutputTime");
-    UPDATE_PROFILE(_probe_side_output_timer, "ProbeWhenProbeSideOutputTime");
-    UPDATE_PROFILE(_probe_process_hashtable_timer, 
"ProbeWhenProcessHashTableTime");
-    UPDATE_PROFILE(_process_other_join_conjunct_timer, 
"OtherJoinConjunctTime");
-    UPDATE_PROFILE(_init_probe_side_timer, "InitProbeSideTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeTime");
+    UPDATE_COUNTER_FROM_INNER("JoinFilterTimer");
+    UPDATE_COUNTER_FROM_INNER("BuildOutputBlock");
+    UPDATE_COUNTER_FROM_INNER("ProbeRows");
+    UPDATE_COUNTER_FROM_INNER("ProbeFindNextTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeExprCallTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeWhenSearchHashTableTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeWhenBuildSideOutputTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeWhenProbeSideOutputTime");
+    UPDATE_COUNTER_FROM_INNER("ProbeWhenProcessHashTableTime");
+    UPDATE_COUNTER_FROM_INNER("OtherJoinConjunctTime");
+    UPDATE_COUNTER_FROM_INNER("InitProbeSideTime");
 }
 
 #undef UPDATE_PROFILE
 
+void PartitionedHashJoinProbeLocalState::update_profile_from_inner() {
+    auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+    if (_shared_state->inner_runtime_state) {
+        auto* sink_local_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
+        auto* probe_local_state = 
_shared_state->inner_runtime_state->get_local_state(
+                p._inner_probe_operator->operator_id());
+
+        if (_shared_state->need_to_spill) {
+            update_build_profile<true>(sink_local_state->profile());
+            update_probe_profile<true>(probe_local_state->profile());
+        } else {
+            update_build_profile<false>(sink_local_state->profile());
+            update_probe_profile<false>(probe_local_state->profile());
+        }
+    }
+}
+
 Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
     return 
_parent->cast<PartitionedHashJoinProbeOperatorX>()._partitioner->clone(state,
@@ -607,21 +583,11 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill
     DCHECK(local_state._shared_state->inner_runtime_state);
     local_state._in_mem_shared_state_sptr =
             std::move(local_state._shared_state->inner_shared_state);
-
-    auto* sink_state = 
local_state._shared_state->inner_runtime_state->get_sink_local_state();
-    if (sink_state != nullptr) {
-        COUNTER_SET(local_state._hash_table_memory_usage,
-                    
sink_state->profile()->get_counter("MemoryUsageHashTable")->value());
-    }
     return Status::OK();
 }
 
 Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) 
const {
-    if (local_state._shared_state->inner_runtime_state) {
-        _update_profile_from_internal_states(local_state);
-    }
-
     local_state._shared_state->inner_runtime_state = 
RuntimeState::create_unique(
             state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
@@ -682,9 +648,6 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
             block.rows(),
             _inner_sink_operator->get_memory_usage(
                     local_state._shared_state->inner_runtime_state.get()));
-
-    COUNTER_SET(local_state._hash_table_memory_usage,
-                
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
     return Status::OK();
 }
 
@@ -762,6 +725,7 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
                 print_id(state->query_id()), node_id(), state->task_id(),
                 local_state._partition_cursor);
         local_state._partition_cursor++;
+        local_state.update_profile_from_inner();
         if (local_state._partition_cursor == _partition_count) {
             *eos = true;
         } else {
@@ -876,18 +840,6 @@ bool 
PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
     return false;
 }
 
-void PartitionedHashJoinProbeOperatorX::_update_profile_from_internal_states(
-        PartitionedHashJoinProbeLocalState& local_state) const {
-    if (local_state._shared_state->inner_runtime_state) {
-        auto* sink_local_state =
-                
local_state._shared_state->inner_runtime_state->get_sink_local_state();
-        local_state.update_build_profile(sink_local_state->profile());
-        auto* probe_local_state = 
local_state._shared_state->inner_runtime_state->get_local_state(
-                _inner_probe_operator->operator_id());
-        local_state.update_probe_profile(probe_local_state->profile());
-    }
-}
-
 Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                     bool* eos) {
     *eos = false;
@@ -946,16 +898,11 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         } else {
             RETURN_IF_ERROR(_inner_probe_operator->pull(
                     local_state._shared_state->inner_runtime_state.get(), 
block, eos));
-            if (*eos) {
-                _update_profile_from_internal_states(local_state);
-            }
+            local_state.update_profile_from_inner();
         }
 
         local_state.add_num_rows_returned(block->rows());
         COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
-        if (*eos) {
-            _update_profile_from_internal_states(local_state);
-        }
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 226116ef7f4..cceac79b357 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -57,11 +57,16 @@ public:
 
     Status finish_spilling(uint32_t partition_index);
 
+    template <bool spilled>
     void update_build_profile(RuntimeProfile* child_profile);
+
+    template <bool spilled>
     void update_probe_profile(RuntimeProfile* child_profile);
 
     std::string debug_string(int indentation_level = 0) const override;
 
+    void update_profile_from_inner();
+
     friend class PartitionedHashJoinProbeOperatorX;
 
 private:
@@ -102,37 +107,8 @@ private:
     RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
     RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
 
-    RuntimeProfile::Counter* _build_phase_label = nullptr;
-    RuntimeProfile::Counter* _build_rows_counter = nullptr;
-    RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
-    RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
-
-    RuntimeProfile::Counter* _build_table_timer = nullptr;
-    RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
-    RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
-    RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr;
-    RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;
-
-    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _probe_blocks_bytes = nullptr;
-
-    RuntimeProfile::Counter* _allocate_resource_timer = nullptr;
-
-    RuntimeProfile::Counter* _probe_phase_label = nullptr;
-    RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
-    RuntimeProfile::Counter* _probe_next_timer = nullptr;
-    RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
-    RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
-    RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
-    RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
-    RuntimeProfile::Counter* _build_side_output_timer = nullptr;
-    RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
-    RuntimeProfile::Counter* _probe_timer = nullptr;
-    RuntimeProfile::Counter* _probe_rows_counter = nullptr;
-    RuntimeProfile::Counter* _join_filter_timer = nullptr;
-    RuntimeProfile::Counter* _build_output_block_timer = nullptr;
     RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
-
     RuntimeProfile::Counter* _get_child_next_timer = nullptr;
 };
 
@@ -195,9 +171,6 @@ private:
 
     bool _should_revoke_memory(RuntimeState* state) const;
 
-    void _update_profile_from_internal_states(
-            PartitionedHashJoinProbeLocalState& local_state) const;
-
     const TJoinDistributionType::type _join_distribution;
 
     std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 2df474f42eb..3c94cc82119 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -553,6 +553,29 @@ Status 
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
     return Status::OK();
 }
 
+#define UPDATE_COUNTER_FROM_INNER(name) \
+    update_profile_from_inner_profile<false>(name, _profile, inner_profile)
+
+void PartitionedHashJoinSinkLocalState::update_profile_from_inner() {
+    auto* sink_local_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
+    if (sink_local_state) {
+        auto* inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(sink_local_state);
+        auto* inner_profile = inner_sink_state->profile();
+        UPDATE_COUNTER_FROM_INNER("PublishRuntimeFilterTime");
+        UPDATE_COUNTER_FROM_INNER("BuildRuntimeFilterTime");
+        UPDATE_COUNTER_FROM_INNER("BuildHashTableTime");
+        UPDATE_COUNTER_FROM_INNER("MergeBuildBlockTime");
+        UPDATE_COUNTER_FROM_INNER("BuildTableInsertTime");
+        UPDATE_COUNTER_FROM_INNER("BuildExprCallTime");
+        UPDATE_COUNTER_FROM_INNER("RuntimeFilterInitTime");
+        UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildBlocks");
+        UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable");
+        UPDATE_COUNTER_FROM_INNER("MemoryUsageBuildKeyArena");
+    }
+}
+
+#undef UPDATE_COUNTER_FROM_INNER
+
 // After building hash table it will not be able to spill later
 // even if memory is low, and will cause cancel of queries.
 // So make a check here, if build blocks mem usage is too high,
@@ -613,6 +636,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
+                local_state.update_profile_from_inner();
+
                 LOG(INFO) << fmt::format(
                         "Query:{}, hash join sink:{}, task:{}, eos, 
set_ready_to_read, nonspill "
                         "memory usage:{}",
@@ -660,6 +685,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
         local_state.update_memory_usage();
+        local_state.update_profile_from_inner();
         if (eos) {
             LOG(INFO) << fmt::format(
                     "Query:{}, hash join sink:{}, task:{}, eos, 
set_ready_to_read, nonspill memory "
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 492c98fa637..50c3a59f214 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -51,6 +51,7 @@ public:
     size_t revocable_mem_size(RuntimeState* state) const;
     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
     void update_memory_usage();
+    void update_profile_from_inner();
 
     Dependency* finishdependency() override;
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 759b88e28bc..3aec42347ff 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -55,26 +55,17 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) {
 
 void SpillSortSinkLocalState::_init_counters() {
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
-
-    _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime");
-    _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime");
-    _sort_blocks_memory_usage =
-            ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", 
TUnit::BYTES, 1);
     _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"SpillMergeSortTime", 1);
 }
-#define UPDATE_PROFILE(counter, name)                           \
-    do {                                                        \
-        auto* child_counter = child_profile->get_counter(name); \
-        if (child_counter != nullptr) {                         \
-            COUNTER_SET(counter, child_counter->value());       \
-        }                                                       \
-    } while (false)
+
+#define UPDATE_PROFILE(name) update_profile_from_inner_profile<true>(name, 
_profile, child_profile)
 
 void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) {
-    UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime");
-    UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime");
-    UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks");
+    UPDATE_PROFILE("PartialSortTime");
+    UPDATE_PROFILE("MergeBlockTime");
+    UPDATE_PROFILE("MemoryUsageSortBlocks");
 }
+#undef UPDATE_PROFILE
 
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
     return Base::close(state, execsink_status);
@@ -161,7 +152,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     
RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(), 
in_block, false));
 
     int64_t data_size = 
local_state._shared_state->in_mem_shared_state->sorter->data_size();
-    COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
     COUNTER_SET(local_state._memory_used_counter, data_size);
 
     if (eos) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 3d6ccdcc4ce..e41a3083ac6 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -51,9 +51,6 @@ private:
 
     std::unique_ptr<RuntimeState> _runtime_state;
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
-    RuntimeProfile::Counter* _partial_sort_timer = nullptr;
-    RuntimeProfile::Counter* _merge_block_timer = nullptr;
-    RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr;
 
     RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
 
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index a3c51faca1f..ff40c4656dc 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 
@@ -263,5 +264,31 @@ private:
     RuntimeProfile::Counter* _reading_task_count = nullptr;
 };
 
+template <bool accumulating>
+inline void update_profile_from_inner_profile(const std::string& name,
+                                              RuntimeProfile* runtime_profile,
+                                              RuntimeProfile* inner_profile) {
+    auto* inner_counter = inner_profile->get_counter(name);
+    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not 
found";
+    if (inner_counter == nullptr) [[unlikely]] {
+        return;
+    }
+    auto* counter = runtime_profile->get_counter(name);
+    if (counter == nullptr) [[unlikely]] {
+        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
+                                               inner_counter->level());
+    }
+    if constexpr (accumulating) {
+        // Memory usage should not be accumulated.
+        if (counter->type() == TUnit::BYTES) {
+            counter->set(inner_counter->value());
+        } else {
+            counter->update(inner_counter->value());
+        }
+    } else {
+        counter->set(inner_counter->value());
+    }
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to