This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 616c205bfc279e4d3c9b861c2698ee0e318be784
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Tue Sep 10 11:06:04 2024 +0800

    [improvement](spill) add counter for memory usage (#40572)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 27 ++++++++++++++++++++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 27 ++++++++++++++++++++++
 be/src/pipeline/exec/exchange_source_operator.cpp  |  4 ++--
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  3 ++-
 be/src/pipeline/exec/operator.cpp                  | 10 ++++----
 be/src/pipeline/exec/operator.h                    |  5 ++++
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  2 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 24 ++++++++++++++++++-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  2 +-
 .../pipeline/exec/spill_sort_source_operator.cpp   |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 18 +++++++++++++++
 be/src/pipeline/pipeline_fragment_context.h        |  6 +++--
 be/src/runtime/workload_group/workload_group.cpp   | 15 +++++++-----
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  7 +++---
 be/src/vec/runtime/vdata_stream_mgr.h              |  4 ++++
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 17 +++++++-------
 be/src/vec/runtime/vdata_stream_recvr.h            |  7 +++---
 be/src/vec/sink/vdata_stream_sender.cpp            |  6 +++++
 be/src/vec/sink/vdata_stream_sender.h              |  2 ++
 be/src/vec/spill/spill_stream.h                    |  5 ++--
 be/src/vec/spill/spill_writer.cpp                  |  8 +++++++
 be/src/vec/spill/spill_writer.h                    | 13 +++++++----
 23 files changed, 172 insertions(+), 44 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 5027d7c10de..58ca10af644 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -162,6 +162,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
         }
+        _parent->memory_used_counter()->update(request.block->ByteSizeLong());
         _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
         if (_queue_dependency && _total_queue_size > _queue_capacity) {
@@ -196,6 +197,7 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
                     request.block_holder->get_block()->be_exec_version()));
         }
+        
_parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong());
         _instance_to_broadcast_package_queue[ins_id].emplace(request);
     }
     if (send_now) {
@@ -291,6 +293,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
         }
         if (request.block) {
+            
_parent->memory_used_counter()->update(-request.block->ByteSizeLong());
             static_cast<void>(brpc_request->release_block());
         }
         q.pop();
@@ -370,6 +373,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
         }
         if (request.block_holder->get_block()) {
+            _parent->memory_used_counter()->update(
+                    -request.block_holder->get_block()->ByteSizeLong());
             static_cast<void>(brpc_request->release_block());
         }
         broadcast_q.pop();
@@ -421,8 +426,26 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
     _turn_off_channel(id, true);
-    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
-    swap(empty, _instance_to_broadcast_package_queue[id]);
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
+            _instance_to_broadcast_package_queue[id];
+    for (; !broadcast_q.empty(); broadcast_q.pop()) {
+        _parent->memory_used_counter()->update(
+                
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+    }
+    {
+        std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
empty;
+        swap(empty, broadcast_q);
+    }
+
+    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
+    for (; !q.empty(); q.pop()) {
+        
_parent->memory_used_counter()->update(-q.front().block->ByteSizeLong());
+    }
+
+    {
+        std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
+        swap(empty, q);
+    }
 }
 
 bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 9e17a76d272..ea16ab2d4fb 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -458,6 +458,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             RETURN_IF_ERROR(
                     local_state._partitioner->do_partitioning(state, block, 
_mem_tracker.get()));
         }
+        int64_t old_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            old_channel_mem_usage += channel->mem_usage();
+        }
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             RETURN_IF_ERROR(channel_add_rows(
                     state, local_state.channels, local_state._partition_count,
@@ -467,7 +471,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     state, local_state.channel_shared_ptrs, 
local_state._partition_count,
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         }
+        int64_t new_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            new_channel_mem_usage += channel->mem_usage();
+        }
+        local_state.memory_used_counter()->update(new_channel_mem_usage - 
old_channel_mem_usage);
     } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        int64_t old_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            old_channel_mem_usage += channel->mem_usage();
+        }
         // check out of limit
         RETURN_IF_ERROR(local_state._send_new_partition_batch());
         std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
@@ -501,7 +514,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, 
num_channels,
                                                   channel2rows, 
convert_block.get(), eos));
+        int64_t new_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            new_channel_mem_usage += channel->mem_usage();
+        }
+        local_state.memory_used_counter()->update(new_channel_mem_usage - 
old_channel_mem_usage);
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        int64_t old_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            old_channel_mem_usage += channel->mem_usage();
+        }
         {
             SCOPED_TIMER(local_state._split_block_hash_compute_timer);
             RETURN_IF_ERROR(
@@ -512,6 +534,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         RETURN_IF_ERROR(channel_add_rows_with_idx(
                 state, local_state.channels, local_state.channels.size(), 
assignments, block, eos));
 
+        int64_t new_channel_mem_usage = 0;
+        for (const auto& channel : local_state.channels) {
+            new_channel_mem_usage += channel->mem_usage();
+        }
+        local_state.memory_used_counter()->update(new_channel_mem_usage - 
old_channel_mem_usage);
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 9db0bca0c43..88915ca2c8a 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -62,8 +62,8 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
-            state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
-            profile(), p.is_merging());
+            state, this, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(),
+            p.num_senders(), profile(), p.is_merging());
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index ae724549f71..0ddc533bcea 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -549,7 +549,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (local_state._should_build_hash_table) {
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from probe side.
-        local_state._build_side_mem_used += in_block->allocated_bytes();
 
         if (local_state._build_side_mutable_block.empty()) {
             auto tmp_build_block = 
vectorized::VectorizedUtils::create_empty_columnswithtypename(
@@ -582,6 +581,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
                     std::move(*in_block)));
+            local_state._build_side_mem_used =
+                    local_state._build_side_mutable_block.allocated_bytes();
         }
     }
 
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index cf1e82f57dd..6d25f75da5d 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -489,9 +489,9 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
     _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
     _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
-    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", 1);
-    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
-            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
+    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", TUnit::BYTES, 1);
+    _peak_memory_usage_counter =
+            _runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", 
TUnit::BYTES, "", 1);
     return Status::OK();
 }
 
@@ -569,9 +569,9 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
     info.parent_profile->add_child(_profile, true, nullptr);
     _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
-    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, 
"MemoryUsage", 1);
+    _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 
TUnit::BYTES, 1);
     _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage", 1);
+            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"", 1);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d111a2a7e24..06715343a0e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -162,6 +162,8 @@ public:
     RuntimeProfile::Counter* rows_returned_counter() { return 
_rows_returned_counter; }
     RuntimeProfile::Counter* blocks_returned_counter() { return 
_blocks_returned_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
+    RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
+    RuntimeProfile::Counter* peak_memory_usage_counter() { return 
_peak_memory_usage_counter; }
     OperatorXBase* parent() { return _parent; }
     RuntimeState* state() { return _state; }
     vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
@@ -372,6 +374,9 @@ public:
 
     RuntimeProfile::Counter* rows_input_counter() { return 
_rows_input_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
+    RuntimeProfile::Counter* memory_used_counter() { return 
_memory_used_counter; }
+    RuntimeProfile::Counter* peak_memory_usage_counter() { return 
_peak_memory_usage_counter; }
+
     virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
     // override in exchange sink , AsyncWriterSink
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 65c1cefa63b..a7929337b63 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -150,7 +150,7 @@ public:
         spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
                                          Base::_spill_block_count, 
Base::_spill_data_size,
                                          Base::_spill_write_disk_timer,
-                                         Base::_spill_write_wait_io_timer);
+                                         Base::_spill_write_wait_io_timer, 
memory_used_counter());
 
         status = to_block(context, keys, values, null_key_data);
         RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 51b6e143b3c..f1ca4b8f7b5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -194,7 +194,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 RETURN_IF_ERROR(spilling_stream->prepare_spill());
                 spilling_stream->set_write_counters(
                         _spill_serialize_block_timer, _spill_block_count, 
_spill_data_size,
-                        _spill_write_disk_timer, _spill_write_wait_io_timer);
+                        _spill_write_disk_timer, _spill_write_wait_io_timer, 
memory_used_counter());
             }
 
             COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a073d922769..7e106db5358 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -70,7 +70,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* 
state) {
         RETURN_IF_ERROR(spilling_stream->prepare_spill());
         spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
                                             _spill_data_size, 
_spill_write_disk_timer,
-                                            _spill_write_wait_io_timer);
+                                            _spill_write_wait_io_timer, 
memory_used_counter());
     }
     return p._partitioner->clone(state, _partitioner);
 }
@@ -132,10 +132,12 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     auto row_desc = p._child->row_desc();
     const auto num_slots = row_desc.num_slots();
     vectorized::Block build_block;
+    size_t block_old_mem = 0;
     auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
     if (inner_sink_state_) {
         auto inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
         build_block = inner_sink_state->_build_side_mutable_block.to_block();
+        block_old_mem = build_block.allocated_bytes();
     }
 
     if (build_block.rows() <= 1) {
@@ -146,9 +148,11 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
     if (build_block.columns() > num_slots) {
         build_block.erase(num_slots);
+        memory_used_counter()->update(build_block.allocated_bytes() - 
block_old_mem);
     }
 
     auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
+        Defer defer {[&]() { 
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
         std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
@@ -160,6 +164,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         auto flush_rows = [&state, 
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
                                          vectorized::SpillStreamSPtr& 
spilling_stream) {
             auto block = partition_block->to_block();
+            Defer defer {[&]() { 
memory_used_counter()->update(-block.allocated_bytes()); }};
             auto status = spilling_stream->spill_block(state, block, false);
 
             if (!status.ok()) {
@@ -182,6 +187,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                 sub_block.get_by_position(i).column =
                         build_block.get_by_position(i).column->cut(offset, 
this_run);
             }
+            auto sub_blocks_memory_usage = sub_block.allocated_bytes();
+            memory_used_counter()->update(sub_blocks_memory_usage);
+            Defer defer {[&]() { 
memory_used_counter()->update(-sub_blocks_memory_usage); }};
 
             offset += this_run;
             const auto is_last_block = offset == total_rows;
@@ -206,10 +214,12 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                     partition_block =
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                 }
+                auto old_mem = partition_block->allocated_bytes();
 
                 {
                     SCOPED_TIMER(_partition_shuffle_timer);
                     Status st = partition_block->add_rows(&sub_block, begin, 
end);
+                    
memory_used_counter()->update(partition_block->allocated_bytes() - old_mem);
                     if (!st.ok()) {
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _spill_status = st;
@@ -226,6 +236,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                     }
                     partition_block =
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
+                    
memory_used_counter()->update(partition_block->allocated_bytes());
                 }
             }
         }
@@ -368,6 +379,7 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
     if (!rows) {
         return Status::OK();
     }
+    Defer defer {[&]() { 
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
     {
         /// TODO: DO NOT execute build exprs twice(when partition and building 
hash table)
         SCOPED_TIMER(_partition_timer);
@@ -407,7 +419,10 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
 
     if (_spill_status_ok) {
         auto block = partitioned_block->to_block();
+        auto block_mem_usage = block.allocated_bytes();
+        Defer defer {[&]() { memory_used_counter()->update(-block_mem_usage); 
}};
         partitioned_block = 
vectorized::MutableBlock::create_unique(block.clone_empty());
+        memory_used_counter()->update(partitioned_block->allocated_bytes());
         auto st = spilling_stream->spill_block(state(), block, false);
         if (!st.ok()) {
             _spill_status_ok = false;
@@ -547,6 +562,10 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                             "fault_inject partitioned_hash_join_sink "
                             "sink_eos failed");
                 });
+                Defer defer {[&]() {
+                    local_state.memory_used_counter()->set(
+                            (int64_t)local_state.revocable_mem_size(state));
+                }};
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
                 VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
@@ -584,6 +603,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                     "fault_inject partitioned_hash_join_sink "
                     "sink failed");
         });
+        Defer defer {[&]() {
+            
local_state.memory_used_counter()->set((int64_t)local_state.revocable_mem_size(state));
+        }};
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 9dcb66240df..5a37ac2f5ab 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -200,7 +200,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
 
     _spilling_stream->set_write_counters(
             Base::_spill_serialize_block_timer, Base::_spill_block_count, 
Base::_spill_data_size,
-            Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer);
+            Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer, 
memory_used_counter());
 
     status = _spilling_stream->prepare_spill();
     RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 601188ae02e..8e601a5c7a5 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -148,7 +148,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 bool eos = false;
                 tmp_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
                                                _spill_data_size, 
_spill_write_disk_timer,
-                                               _spill_write_wait_io_timer);
+                                               _spill_write_wait_io_timer, 
memory_used_counter());
                 while (!eos && !state->is_cancelled()) {
                     merge_sorted_block.clear_column_data();
                     {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 5d1b6aaaa1b..db7310aa80d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -187,6 +187,9 @@ void PipelineFragmentContext::cancel(const Status reason) {
                     this->debug_string());
     }
 
+    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || 
reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
+        print_profile("cancel pipeline, reason: " + reason.to_string());
+    }
     _query_ctx->cancel(reason, _fragment_id);
     if (reason.is<ErrorCode::LIMIT_REACH>()) {
         _is_report_on_cancel = false;
@@ -1721,6 +1724,21 @@ Status PipelineFragmentContext::submit() {
     }
 }
 
+void PipelineFragmentContext::print_profile(const std::string& extra_info) {
+    if (_runtime_state->enable_profile()) {
+        std::stringstream ss;
+        for (auto runtime_profile_ptr : 
_runtime_state->pipeline_id_to_profile()) {
+            runtime_profile_ptr->pretty_print(&ss);
+        }
+
+        if (_runtime_state->load_channel_profile()) {
+            _runtime_state->load_channel_profile()->pretty_print(&ss);
+        }
+
+        LOG_INFO("Query {} fragment {} {}, profile, {}", 
print_id(this->_query_id),
+                 this->_fragment_id, extra_info, ss.str());
+    }
+}
 // If all pipeline tasks binded to the fragment instance are finished, then we 
could
 // close the fragment instance.
 void PipelineFragmentContext::_close_fragment_instance() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index a2c55214ba3..c0924be38b6 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -69,8 +69,10 @@ public:
 
     ~PipelineFragmentContext();
 
-    std::vector<std::shared_ptr<TRuntimeProfileTree>> 
collect_realtime_profile() const;
-    std::shared_ptr<TRuntimeProfileTree> 
collect_realtime_load_channel_profile() const;
+    void print_profile(const std::string& extra_info);
+
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> 
collect_realtime_profile_x() const;
+    std::shared_ptr<TRuntimeProfileTree> 
collect_realtime_load_channel_profile_x() const;
 
     bool is_timeout(timespec now) const;
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 6f3b51f09fd..4d53ffc9686 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -98,17 +98,20 @@ std::string WorkloadGroup::debug_string() const {
 }
 
 std::string WorkloadGroup::memory_debug_string() const {
+    auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
+    auto mem_used_ratio = realtime_total_mem_used / 
(double)_weighted_memory_limit;
     return fmt::format(
             "TG[id = {}, name = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, weighted_memory_limit = {}, total_mem_used = {}, "
-            "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, 
"
+            "{}, weighted_memory_limit = {}, total_mem_used = {},"
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
+            "{}, "
             "spill_high_watermark = {}, version = {}, is_shutdown = {}, 
query_num = {}]",
             _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
             _enable_memory_overcommit ? "true" : "false",
-            PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
-            PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
-            PrettyPrinter::print(_wg_refresh_interval_memory_growth, 
TUnit::BYTES),
-            _spill_low_watermark, _spill_high_watermark, _version, 
_is_shutdown,
+            PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
+            PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+            PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
+            mem_used_ratio, _spill_low_watermark, _spill_high_watermark, 
_version, _is_shutdown,
             _query_ctxs.size());
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index a5db9a6150d..e248b074f67 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const 
TUniqueId& fragment_instanc
 }
 
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
-        RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging) {
+        RuntimeState* state, pipeline::ExchangeLocalState* parent, const 
RowDescriptor& row_desc,
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        RuntimeProfile* profile, bool is_merging) {
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << 
print_id(fragment_instance_id)
               << ", node=" << dest_node_id;
-    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state, 
row_desc,
+    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, parent, 
state, row_desc,
                                                                  
fragment_instance_id, dest_node_id,
                                                                  num_senders, 
is_merging, profile));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index 09e347fcfb2..bd5e6f9b91e 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -40,6 +40,9 @@ class RuntimeState;
 class RowDescriptor;
 class RuntimeProfile;
 class PTransmitDataParams;
+namespace pipeline {
+class ExchangeLocalState;
+}
 
 namespace vectorized {
 class VDataStreamRecvr;
@@ -50,6 +53,7 @@ public:
     ~VDataStreamMgr();
 
     std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state,
+                                                   
pipeline::ExchangeLocalState* parent,
                                                    const RowDescriptor& 
row_desc,
                                                    const TUniqueId& 
fragment_instance_id,
                                                    PlanNodeId dest_node_id, 
int num_senders,
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 1ca6bb7f2c5..d2f79b8529e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -96,6 +96,7 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
     _block_queue.pop_front();
+    _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
     sub_blocks_memory_usage(block_byte_size);
     _record_debug_info();
     if (_block_queue.empty() && _source_dependency) {
@@ -207,6 +208,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
+    _recvr->_parent->memory_used_counter()->update(block_byte_size);
     add_blocks_memory_usage(block_byte_size);
     return Status::OK();
 }
@@ -245,6 +247,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         _record_debug_info();
         try_set_dep_ready_without_lock();
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
+        _recvr->_parent->memory_used_counter()->update(block_mem_size);
         add_blocks_memory_usage(block_mem_size);
     }
 }
@@ -315,12 +318,13 @@ void VDataStreamRecvr::SenderQueue::close() {
     _block_queue.clear();
 }
 
-VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* 
state,
-                                   const RowDescriptor& row_desc,
+VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, 
pipeline::ExchangeLocalState* parent,
+                                   RuntimeState* state, const RowDescriptor& 
row_desc,
                                    const TUniqueId& fragment_instance_id, 
PlanNodeId dest_node_id,
                                    int num_senders, bool is_merging, 
RuntimeProfile* profile)
         : HasTaskExecutionCtx(state),
           _mgr(stream_mgr),
+          _parent(parent),
           _query_thread_context(state->query_id(), state->query_mem_tracker(),
                                 state->get_query_ctx()->workload_group()),
           _fragment_instance_id(fragment_instance_id),
@@ -352,9 +356,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
     }
 
     // Initialize the counters
-    _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
-    _peak_memory_usage_counter =
-            _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
     _remote_bytes_received_counter = ADD_COUNTER(_profile, 
"RemoteBytesReceived", TUnit::BYTES);
     _local_bytes_received_counter = ADD_COUNTER(_profile, 
"LocalBytesReceived", TUnit::BYTES);
 
@@ -417,7 +418,7 @@ std::shared_ptr<pipeline::Dependency> 
VDataStreamRecvr::get_local_channel_depend
 }
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
-    _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    
_parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption());
     if (!_is_merging) {
         block->clear();
         return _sender_queues[0]->get_batch(block, eos);
@@ -492,8 +493,8 @@ void VDataStreamRecvr::close() {
     _mgr = nullptr;
 
     _merger.reset();
-    if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    if (_parent->peak_memory_usage_counter()) {
+        
_parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption());
     }
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index e8dcfdedba5..b2d76590ba2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -69,7 +69,8 @@ class VDataStreamRecvr;
 class VDataStreamRecvr : public HasTaskExecutionCtx {
 public:
     class SenderQueue;
-    VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const 
RowDescriptor& row_desc,
+    VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* 
parent,
+                     RuntimeState* state, const RowDescriptor& row_desc,
                      const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
                      int num_senders, bool is_merging, RuntimeProfile* 
profile);
 
@@ -120,6 +121,8 @@ private:
     // DataStreamMgr instance used to create this recvr. (Not owned)
     VDataStreamMgr* _mgr = nullptr;
 
+    pipeline::ExchangeLocalState* _parent = nullptr;
+
     QueryThreadContext _query_thread_context;
 
     // Fragment and node id of the destination exchange node this receiver is 
used by.
@@ -152,8 +155,6 @@ private:
     RuntimeProfile::Counter* _data_arrival_timer = nullptr;
     RuntimeProfile::Counter* _decompress_timer = nullptr;
     RuntimeProfile::Counter* _decompress_bytes = nullptr;
-    RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
 
     // Number of rows received
     RuntimeProfile::Counter* _rows_produced_counter = nullptr;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b6bfc7862dc..f894df25153 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -140,6 +140,12 @@ std::shared_ptr<pipeline::Dependency> 
PipChannel::get_local_channel_dependency()
             Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
 }
 
+int64_t PipChannel::mem_usage() const {
+    auto* mutable_block = 
Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block();
+    int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
+    return mem_usage;
+}
+
 Status PipChannel::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
     
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
     std::unique_ptr<PBlock> pblock_ptr;
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 0ceec97f1fc..6cd1ad9ac49 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -275,6 +275,8 @@ public:
 
     ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
 
+    int64_t mem_usage() const;
+
     void ch_roll_pb_block() override {
         // We have two choices here.
         // 1. Use a PBlock pool and fetch an available PBlock if we need one. 
In this way, we can
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 7a4bb4980b1..26b7dcbaf06 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -68,9 +68,10 @@ public:
                             RuntimeProfile::Counter* write_block_counter,
                             RuntimeProfile::Counter* write_bytes_counter,
                             RuntimeProfile::Counter* write_timer,
-                            RuntimeProfile::Counter* wait_io_timer) {
+                            RuntimeProfile::Counter* wait_io_timer,
+                            RuntimeProfile::Counter* memory_used_counter) {
         writer_->set_counters(serialize_timer, write_block_counter, 
write_bytes_counter,
-                              write_timer);
+                              write_timer, memory_used_counter);
         write_wait_io_timer_ = wait_io_timer;
     }
 
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 46a97285802..9fbd81601b6 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -84,6 +84,9 @@ Status SpillWriter::write(RuntimeState* state, const Block& 
block, size_t& writt
                 }
             });
 
+            auto tmp_blcok_mem = tmp_block.allocated_bytes();
+            memory_used_counter_->update(tmp_blcok_mem);
+            Defer defer {[&]() { memory_used_counter_->update(-tmp_blcok_mem); 
}};
             RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes));
 
             row_idx += block_rows;
@@ -107,10 +110,14 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
                     &compressed_bytes,
                     segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
             RETURN_IF_ERROR(status);
+            auto pblock_mem = pblock.ByteSizeLong();
+            memory_used_counter_->update(pblock_mem);
+            Defer defer {[&]() { memory_used_counter_->update(-pblock_mem); }};
             if (!pblock.SerializeToString(&buff)) {
                 return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
                         "serialize spill data error. [path={}]", file_path_);
             }
+            memory_used_counter_->update(buff.size());
         }
         if (data_dir_->reach_capacity_limit(buff.size())) {
             return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
@@ -124,6 +131,7 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
         {
             auto buff_size = buff.size();
             Defer defer {[&]() {
+                memory_used_counter_->update(-buff_size);
                 if (status.ok()) {
                     data_dir_->update_spill_data_usage(buff_size);
 
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index d77bbd6908c..c3502b5d6a4 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -51,11 +51,13 @@ public:
     void set_counters(RuntimeProfile::Counter* serialize_timer,
                       RuntimeProfile::Counter* write_block_counter,
                       RuntimeProfile::Counter* write_bytes_counter,
-                      RuntimeProfile::Counter* write_timer) {
+                      RuntimeProfile::Counter* write_timer,
+                      RuntimeProfile::Counter* memory_used_counter) {
         serialize_timer_ = serialize_timer;
         write_block_counter_ = write_block_counter;
         write_bytes_counter_ = write_bytes_counter;
         write_timer_ = write_timer;
+        memory_used_counter_ = memory_used_counter;
     }
 
 private:
@@ -78,10 +80,11 @@ private:
     int64_t total_written_bytes_ = 0;
     std::string meta_;
 
-    RuntimeProfile::Counter* write_bytes_counter_;
-    RuntimeProfile::Counter* serialize_timer_;
-    RuntimeProfile::Counter* write_timer_;
-    RuntimeProfile::Counter* write_block_counter_;
+    RuntimeProfile::Counter* write_bytes_counter_ = nullptr;
+    RuntimeProfile::Counter* serialize_timer_ = nullptr;
+    RuntimeProfile::Counter* write_timer_ = nullptr;
+    RuntimeProfile::Counter* write_block_counter_ = nullptr;
+    RuntimeProfile::Counter* memory_used_counter_ = nullptr;
 };
 using SpillWriterUPtr = std::unique_ptr<SpillWriter>;
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to