This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ef8d9ad9a47369e88c26d6f7ce295740ff42c770
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Jan 31 14:08:08 2024 +0800

    [pipelinex](profile) improve memory counter of pipelineX (#30538)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp   |  7 +++----
 be/src/pipeline/exec/aggregation_sink_operator.h     |  1 -
 be/src/pipeline/exec/analytic_sink_operator.cpp      |  4 ++--
 be/src/pipeline/exec/analytic_sink_operator.h        |  1 -
 be/src/pipeline/exec/analytic_source_operator.cpp    |  3 +--
 be/src/pipeline/exec/analytic_source_operator.h      |  1 -
 be/src/pipeline/exec/exchange_sink_operator.cpp      |  1 -
 be/src/pipeline/exec/exchange_sink_operator.h        |  1 -
 be/src/pipeline/exec/hashjoin_build_sink.cpp         | 20 ++++++++++++--------
 be/src/pipeline/exec/hashjoin_build_sink.h           |  1 -
 be/src/pipeline/exec/hashjoin_probe_operator.cpp     |  4 +++-
 be/src/pipeline/exec/scan_operator.cpp               |  9 +++++----
 be/src/pipeline/exec/sort_sink_operator.cpp          | 11 ++++++++---
 be/src/pipeline/exec/sort_sink_operator.h            |  2 +-
 be/src/pipeline/pipeline_x/operator.cpp              |  8 ++++----
 be/src/util/runtime_profile.cpp                      |  5 +++--
 be/src/util/runtime_profile.h                        |  6 ++++--
 be/src/vec/common/hash_table/hash_map_context.h      | 14 ++++++++++++++
 be/src/vec/exec/join/process_hash_table_probe_impl.h |  2 ++
 be/src/vec/exec/join/vhash_join_node.h               |  6 ++++--
 20 files changed, 66 insertions(+), 41 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 5746551a29f..c5822591d4a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -88,11 +88,10 @@ Status AggSinkLocalState<DependencyType, 
Derived>::init(RuntimeState* state,
         RETURN_IF_ERROR(
                 p._probe_expr_ctxs[i]->clone(state, 
Base::_shared_state->probe_expr_ctxs[i]));
     }
-    _memory_usage_counter = ADD_LABEL_COUNTER(Base::profile(), "MemoryUsage");
-    _hash_table_memory_usage =
-            ADD_CHILD_COUNTER(Base::profile(), "HashTable", TUnit::BYTES, 
"MemoryUsage");
+    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
+                                                            TUnit::BYTES, 
"MemoryUsage", 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
-            "SerializeKeyArena", TUnit::BYTES, "MemoryUsage");
+            "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
 
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _build_table_convert_timer = ADD_TIMER(Base::profile(), 
"BuildConvertToPartitionedTime");
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index a7b30d46117..8bdd624ca48 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -308,7 +308,6 @@ protected:
     RuntimeProfile::Counter* _serialize_data_timer = nullptr;
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
     RuntimeProfile::Counter* _max_row_size_counter = nullptr;
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index d9923a68f24..e9cc0cb1d06 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -34,8 +34,8 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
     
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
 
-    _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
-    _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", 
TUnit::BYTES, "MemoryUsage");
+    _blocks_memory_usage =
+            _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
     _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
 
     size_t agg_size = p._agg_expr_ctxs.size();
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 064d68f189a..c291aa95226 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -84,7 +84,6 @@ private:
                                                      bool need_check_first = 
false);
     bool _whether_need_next_partition(vectorized::BlockRowPos& 
found_partition_end);
 
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _evaluation_timer = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index a95c2a1225a..0642b4b76bd 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -167,9 +167,8 @@ Status AnalyticLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->cast<AnalyticSourceOperatorX>();
     _agg_functions_size = p._agg_functions.size();
 
-    _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
     _blocks_memory_usage =
-            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage");
+            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
     _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
 
     _agg_functions.resize(p._agg_functions.size());
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index f4e2f10a719..b4acb2fbce5 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -121,7 +121,6 @@ private:
     std::unique_ptr<vectorized::Arena> _agg_arena_pool;
     std::vector<vectorized::AggFnEvaluator*> _agg_functions;
 
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _evaluation_timer = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 1f9ba3b4203..9c26e975918 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -124,7 +124,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             "");
     _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", 
TUnit::BYTES);
-    _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
     static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
     _wait_queue_timer =
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 3e6486e34fd..5ba8228e502 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -178,7 +178,6 @@ private:
     // Used to counter send bytes under local data exchange
     RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
 
     RuntimeProfile::Counter* _wait_queue_timer = nullptr;
     RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index ccf2f2a5894..5b6689e35a5 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -84,14 +84,12 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
         }
     }
 
-    _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
-
     _build_blocks_memory_usage =
-            ADD_CHILD_COUNTER(profile(), "BuildBlocks", TUnit::BYTES, 
"MemoryUsage");
+            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
     _hash_table_memory_usage =
-            ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, 
"MemoryUsage");
+            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, 
"MemoryUsage", 1);
     _build_arena_memory_usage =
-            profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, 
"MemoryUsage");
+            profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, 
"MemoryUsage", 1);
 
     // Build phase
     auto* record_profile = _should_build_hash_table ? profile() : 
faker_runtime_profile();
@@ -271,7 +269,9 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                                             
HashJoinBuildSinkLocalState>
                                   hash_table_build_process(rows, raw_ptrs, 
this,
                                                            
state->batch_size(), state);
-                          return hash_table_build_process.template run<
+                          auto old_hash_table_size = 
arg.hash_table->get_byte_size();
+                          auto old_key_size = arg.serialized_keys_size(true);
+                          auto st = hash_table_build_process.template run<
                                   JoinOpType::value, has_null_value,
                                   short_circuit_for_null_in_build_side, 
with_other_conjuncts>(
                                   arg,
@@ -279,6 +279,10 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                           ? &null_map_val->get_data()
                                           : nullptr,
                                   &_shared_state->_has_null_in_build_side);
+                          
_mem_tracker->consume(arg.hash_table->get_byte_size() -
+                                                old_hash_table_size);
+                          _mem_tracker->consume(arg.serialized_keys_size(true) 
- old_key_size);
+                          return st;
                       }},
             *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
             vectorized::make_bool_variant(_build_side_ignore_null),
@@ -469,6 +473,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
+            COUNTER_UPDATE(local_state._build_blocks_memory_usage, 
in_block->bytes());
+            local_state._mem_tracker->consume(in_block->bytes());
             if (local_state._build_side_mutable_block.rows() >
                 std::numeric_limits<uint32_t>::max()) {
                 return Status::NotSupported(
@@ -483,8 +489,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         DCHECK(!local_state._build_side_mutable_block.empty());
         local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                 local_state._build_side_mutable_block.to_block());
-        COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                       (*local_state._shared_state->build_block).bytes());
 
         const bool use_global_rf =
                 
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 7521563ecb8..2acc25151ab 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -135,7 +135,6 @@ protected:
 
     RuntimeProfile::Counter* _allocate_resource_timer = nullptr;
 
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
     RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index aef2e011fa0..f7a06655b19 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -53,7 +53,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     _construct_mutable_join_block();
     _probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
     _probe_arena_memory_usage =
-            profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, 
"MemoryUsage");
+            profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, 
"MemoryUsage", 1);
     // Probe phase
     _probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
     _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
@@ -320,6 +320,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                         mutable_join_block, &temp_block,
                                         local_state._probe_block.rows(), 
_is_mark_join,
                                         _have_other_join_conjunct);
+                                local_state._mem_tracker->set_consumption(
+                                        arg.serialized_keys_size(false));
                             } else {
                                 st = Status::InternalError("uninited hash 
table");
                             }
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 49f3bd2269c..88e6c880568 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -28,6 +28,7 @@
 #include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/operator.h"
+#include "util/runtime_profile.h"
 #include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exec/scan/pip_scanner_context.h"
 #include "vec/exec/scan/scanner_context.h"
@@ -1286,11 +1287,11 @@ Status ScanLocalState<Derived>::_init_profile() {
     _scanner_profile.reset(new RuntimeProfile("VScanner"));
     profile()->add_child(_scanner_profile.get(), true, nullptr);
 
-    _memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
-    _queued_blocks_memory_usage =
-            _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", 
TUnit::BYTES, "MemoryUsage");
+    _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, 
"MemoryUsage", 1);
+    _queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
+            "QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
     _free_blocks_memory_usage =
-            _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", 
TUnit::BYTES, "MemoryUsage");
+            _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
     _newly_create_free_blocks_num =
             ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
     // time of transfer thread to wait for block from scan thread
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 56a81422484..465622be4aa 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -63,7 +63,8 @@ Status SortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
 
     _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true");
 
-    _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
+    _sort_blocks_memory_usage =
+            ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, 
"MemoryUsage", 1);
     return Status::OK();
 }
 
@@ -149,16 +150,20 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block* in
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
         
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
+        
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
+        COUNTER_SET(local_state._sort_blocks_memory_usage,
+                    (int64_t)local_state._shared_state->sorter->data_size());
         RETURN_IF_CANCELLED(state);
 
         // update runtime predicate
         if (_use_topn_opt) {
             vectorized::Field new_top = 
local_state._shared_state->sorter->get_top_value();
             if (!new_top.is_null() && new_top != local_state.old_top) {
-                auto& sort_description = 
local_state._shared_state->sorter->get_sort_description();
+                const auto& sort_description =
+                        
local_state._shared_state->sorter->get_sort_description();
                 auto col = 
in_block->get_by_position(sort_description[0].column_number);
                 bool is_reverse = sort_description[0].direction < 0;
-                auto query_ctx = state->get_query_ctx();
+                auto* query_ctx = state->get_query_ctx();
                 RETURN_IF_ERROR(
                         query_ctx->get_runtime_predicate().update(new_top, 
col.name, is_reverse));
                 local_state.old_top = std::move(new_top);
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 7069183f3b2..d3d85a3e5c9 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -72,7 +72,7 @@ private:
     // Expressions and parameters used for build _sort_description
     vectorized::VSortExecExprs _vsort_exec_exprs;
 
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
+    RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr;
 
     // topn top value
     vectorized::Field old_top {vectorized::Field::Types::Null};
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index ef17ee70ed6..4be062b9e8c 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -403,9 +403,9 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
     _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
     _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
-    _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
+    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", 1);
     _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
-            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
+            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
     return Status::OK();
 }
 
@@ -462,9 +462,9 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
     info.parent_profile->add_child(_profile, true, nullptr);
     _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
-    _memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
+    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, 
"MemoryUsage", 1);
     _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
+            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage", 1);
     return Status::OK();
 }
 
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 38464cda63e..f5ed7250618 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -364,7 +364,8 @@ const std::string* RuntimeProfile::get_info_string(const 
std::string& key) {
 
 #define ADD_COUNTER_IMPL(NAME, T)                                              
                    \
     RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, 
TUnit::type unit,             \
-                                            const std::string& 
parent_counter_name) {              \
+                                            const std::string& 
parent_counter_name,                \
+                                            int64_t level) {                   
                    \
         DCHECK_EQ(_is_averaged_profile, false);                                
                    \
         std::lock_guard<std::mutex> l(_counter_map_lock);                      
                    \
         if (_counter_map.find(name) != _counter_map.end()) {                   
                    \
@@ -372,7 +373,7 @@ const std::string* RuntimeProfile::get_info_string(const 
std::string& key) {
         }                                                                      
                    \
         DCHECK(parent_counter_name == ROOT_COUNTER ||                          
                    \
                _counter_map.find(parent_counter_name) != _counter_map.end());  
                    \
-        T* counter = _pool->add(new T(unit));                                  
                    \
+        T* counter = _pool->add(new T(unit, level));                           
                    \
         _counter_map[name] = counter;                                          
                    \
         std::set<std::string>* child_counters =                                
                    \
                 find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>()); \
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index d5233d40f2c..4cc2c2617ec 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -132,7 +132,8 @@ public:
     /// as value()) and the current value.
     class HighWaterMarkCounter : public Counter {
     public:
-        HighWaterMarkCounter(TUnit::type unit) : Counter(unit), 
current_value_(0) {}
+        HighWaterMarkCounter(TUnit::type unit, int64_t level = 2)
+                : Counter(unit, 0, level), current_value_(0) {}
 
         virtual void add(int64_t delta) {
             current_value_.fetch_add(delta, std::memory_order_relaxed);
@@ -413,7 +414,8 @@ public:
     /// Adds a high water mark counter to the runtime profile. Otherwise, same 
behavior
     /// as AddCounter().
     HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, 
TUnit::type unit,
-                                                  const std::string& 
parent_counter_name = "");
+                                                  const std::string& 
parent_counter_name = "",
+                                                  int64_t level = 2);
 
     // Only for create MemTracker(using profile's counter to calc consumption)
     std::shared_ptr<HighWaterMarkCounter> AddSharedHighWaterMarkCounter(
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 41f3bd52efd..1536d48fe7a 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -69,6 +69,8 @@ struct MethodBaseInner {
                                       const uint8_t* null_map = nullptr, bool 
is_join = false,
                                       bool is_build = false, uint32_t 
bucket_size = 0) = 0;
 
+    virtual size_t serialized_keys_size(bool is_build) const { return 0; }
+
     void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const 
uint8_t* null_map) {
         bucket_nums.resize(num_rows);
 
@@ -243,6 +245,10 @@ struct MethodSerialized : public MethodBase<TData> {
         Base::keys = input_keys.data();
     }
 
+    size_t serialized_keys_size(bool is_build) const override {
+        return is_build ? build_arena.size() : Base::arena.size();
+    }
+
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                               const uint8_t* null_map = nullptr, bool is_join 
= false,
                               bool is_build = false, uint32_t bucket_size = 0) 
override {
@@ -277,6 +283,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
 
     std::vector<StringRef> stored_keys;
 
+    size_t serialized_keys_size(bool is_build) const override {
+        return stored_keys.size() * sizeof(StringRef);
+    }
+
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                               const uint8_t* null_map = nullptr, bool is_join 
= false,
                               bool is_build = false, uint32_t bucket_size = 0) 
override {
@@ -430,6 +440,10 @@ struct MethodKeysFixed : public MethodBase<TData> {
         }
     }
 
+    size_t serialized_keys_size(bool is_build) const override {
+        return (is_build ? build_stored_keys.size() : stored_keys.size()) *
+               sizeof(typename Base::Key);
+    }
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                               const uint8_t* null_map = nullptr, bool is_join 
= false,
                               bool is_build = false, uint32_t bucket_size = 0) 
override {
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 9f5167bb555..1939b702c69 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -139,6 +139,8 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
                                             false, 
hash_table_ctx.hash_table->get_bucket_size());
         hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
                                                   need_judge_null ? null_map : 
nullptr);
+        COUNTER_SET(_parent->_probe_arena_memory_usage,
+                    (int64_t)hash_table_ctx.serialized_keys_size(false));
     }
     return typename HashTableType::State(_parent->_probe_columns);
 }
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index fb5bf19015d..95c59094ba6 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -141,8 +141,10 @@ struct ProcessHashTableBuild {
         hash_table_ctx.bucket_nums.resize(_batch_size);
         hash_table_ctx.bucket_nums.shrink_to_fit();
 
-        COUNTER_UPDATE(_parent->_hash_table_memory_usage,
-                       hash_table_ctx.hash_table->get_byte_size());
+        COUNTER_SET(_parent->_hash_table_memory_usage,
+                    (int64_t)hash_table_ctx.hash_table->get_byte_size());
+        COUNTER_SET(_parent->_build_arena_memory_usage,
+                    (int64_t)hash_table_ctx.serialized_keys_size(true));
         return Status::OK();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to