This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5da50b7a4 [metrics](shuffle) Add necessary metrics (#40476)
9d5da50b7a4 is described below

commit 9d5da50b7a43f8e4c26a3e0cb3e039d86c6f093d
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Sep 9 15:34:43 2024 +0800

    [metrics](shuffle) Add necessary metrics (#40476)
---
 be/src/vec/runtime/vdata_stream_mgr.cpp   |  8 +++++---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 13 ++++++++++---
 be/src/vec/runtime/vdata_stream_recvr.h   |  7 +++++--
 3 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 80cc2d93f8e..a5db9a6150d 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -109,6 +109,8 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
     t_finst_id.hi = finst_id.hi();
     t_finst_id.lo = finst_id.lo();
     std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
+    ThreadCpuStopWatch cpu_time_stop_watch;
+    cpu_time_stop_watch.start();
     static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
     if (recvr == nullptr) {
         // The receiver may remove itself from the receiver map via 
deregister_recvr()
@@ -137,9 +139,9 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
 
     bool eos = request->eos();
     if (request->has_block()) {
-        RETURN_IF_ERROR(recvr->add_block(request->block(), 
request->sender_id(),
-                                         request->be_number(), 
request->packet_seq(),
-                                         eos ? nullptr : done, 
wait_for_worker));
+        RETURN_IF_ERROR(recvr->add_block(
+                request->block(), request->sender_id(), request->be_number(), 
request->packet_seq(),
+                eos ? nullptr : done, wait_for_worker, 
cpu_time_stop_watch.elapsed_time()));
     }
 
     if (eos) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 5326f2b7d0a..1ca6bb7f2c5 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -134,7 +134,8 @@ void 
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
 Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
                                                 int64_t packet_seq,
                                                 ::google::protobuf::Closure** 
done,
-                                                const int64_t wait_for_worker) 
{
+                                                const int64_t wait_for_worker,
+                                                const uint64_t 
time_to_find_recvr) {
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_is_cancelled) {
@@ -189,6 +190,10 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _recvr->_max_wait_worker_time->set(wait_for_worker);
     }
 
+    if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
+        _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
+    }
+
     _block_queue.emplace_back(std::move(block), block_byte_size);
     COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
     _record_debug_info();
@@ -363,6 +368,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
     _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", 
TUnit::UNIT);
     _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", 
TUnit::UNIT);
     _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", 
TUnit::UNIT);
+    _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", 
TUnit::UNIT);
 }
 
 VDataStreamRecvr::~VDataStreamRecvr() {
@@ -391,11 +397,12 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
 
 Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
                                    int64_t packet_seq, 
::google::protobuf::Closure** done,
-                                   const int64_t wait_for_worker) {
+                                   const int64_t wait_for_worker,
+                                   const uint64_t time_to_find_recvr) {
     SCOPED_ATTACH_TASK(_query_thread_context);
     int use_sender_id = _is_merging ? sender_id : 0;
     return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done,
-                                                    wait_for_worker);
+                                                    wait_for_worker, 
time_to_find_recvr);
 }
 
 void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 7eebdf0249b..e8dcfdedba5 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -83,7 +83,8 @@ public:
     std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }
 
     Status add_block(const PBlock& pblock, int sender_id, int be_number, 
int64_t packet_seq,
-                     ::google::protobuf::Closure** done, const int64_t 
wait_for_worker);
+                     ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
+                     const uint64_t time_to_find_recvr);
 
     void add_block(Block* block, int sender_id, bool use_move);
 
@@ -160,6 +161,7 @@ private:
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
     RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
     RuntimeProfile::Counter* _max_wait_to_process_time = nullptr;
+    RuntimeProfile::Counter* _max_find_recvr_time = nullptr;
 
     std::vector<std::shared_ptr<pipeline::Dependency>> 
_sender_to_local_channel_dependency;
 };
@@ -178,7 +180,8 @@ public:
     Status get_batch(Block* next_block, bool* eos);
 
     Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
-                     ::google::protobuf::Closure** done, const int64_t 
wait_for_worker);
+                     ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
+                     const uint64_t time_to_find_recvr);
 
     void add_block(Block* block, bool use_move);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to