This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 736f1ad53d2 [profile](rpc) add RpcInstanceDetails profile to debug rpc 
consumption problem (#43284)
736f1ad53d2 is described below

commit 736f1ad53d28bb75ee902c1885f255ddce7d0acf
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Tue Nov 12 14:17:00 2024 +0800

    [profile](rpc) add RpcInstanceDetails profile to debug rpc consumption 
problem (#43284)
    
    Picked from the 2.1 branch, only the RPC profile-related code was
    selected.
    https://github.com/apache/doris/pull/39852
    https://github.com/apache/doris/pull/40117
    
    ```
                          DATA_STREAM_SINK_OPERATOR  (id=2,dst_id=2):
                                -  RpcCount:  sum  16,  avg  4,  max  4,  min  4
                                -  RpcMaxTime:  avg  1.15ms,  max  1.163ms,  
min  818.493us
    
     -  RpcAvgTime:  11.850ms
                              -  RpcCount:  10
                              -  RpcMaxTime:  86.891ms
                              -  RpcMinTime:  15.200ms
                              -  RpcSumTime:  118.503ms
                              -  SerializeBatchTime:  13.517ms
                              -  SplitBlockDistributeByChannelTime:  38.923ms
                              -  SplitBlockHashComputeTime:  2.659ms
                              -  UncompressedRowBatchSize:  135.19  KB
                              -  WaitForDependencyTime:  0ns
                                  -  WaitForRpcBufferQueue:  0ns
                            RpcInstanceDetails:
                                  -  Instance  85d4f75b72a9ea61:  Count:  4,  
MaxTime:  36.238ms,  MinTime:  12.107ms,  AvgTime:  21.722ms,  SumTime:  
86.891ms
                                  -  Instance  85d4f75b72a9ea91:  Count:  3,  
MaxTime:  11.107ms,  MinTime:  2.431ms,  AvgTime:  5.470ms,  SumTime:  16.412ms
                                  -  Instance  85d4f75b72a9eac1:  Count:  3,  
MaxTime:  7.554ms,  MinTime:  3.160ms,  AvgTime:  5.066ms,  SumTime:  15.200m
    ```
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 75 +++++++++++++++++-----
 be/src/pipeline/exec/exchange_sink_buffer.h        | 13 +++-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  1 +
 be/src/runtime/runtime_state.h                     | 11 ++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 +++
 5 files changed, 93 insertions(+), 17 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7163299d766..0f02ffc2b9a 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -25,6 +25,7 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
+#include <pdqsort.h>
 #include <stddef.h>
 
 #include <atomic>
@@ -129,7 +130,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId 
fragment_instance_id) {
     finst_id.set_lo(fragment_instance_id.lo);
     _rpc_channel_is_idle[low_id] = true;
     _instance_to_receiver_eof[low_id] = false;
-    _instance_to_rpc_time[low_id] = 0;
+    
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
+    _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
     _construct_request(low_id, finst_id);
 }
 
@@ -261,7 +263,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            set_rpc_time(id, start_rpc_time, result.receive_time());
+
+            auto end_rpc_time = GetCurrentTimeNanos();
+            update_rpc_time(id, start_rpc_time, end_rpc_time);
+
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
                 _set_receiver_eof(id);
@@ -339,7 +344,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            set_rpc_time(id, start_rpc_time, result.receive_time());
+
+            auto end_rpc_time = GetCurrentTimeNanos();
+            update_rpc_time(id, start_rpc_time, end_rpc_time);
+
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
                 _set_receiver_eof(id);
@@ -466,10 +474,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId 
id,
 void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* 
min_time) {
     int64_t local_max_time = 0;
     int64_t local_min_time = INT64_MAX;
-    for (auto& [id, time] : _instance_to_rpc_time) {
-        if (time != 0) {
-            local_max_time = std::max(local_max_time, time);
-            local_min_time = std::min(local_min_time, time);
+    for (auto& [id, stats] : _instance_to_rpc_stats) {
+        if (stats->sum_time != 0) {
+            local_max_time = std::max(local_max_time, stats->sum_time);
+            local_min_time = std::min(local_min_time, stats->sum_time);
         }
     }
     *max_time = local_max_time;
@@ -478,27 +486,32 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* 
max_time, int64_t* min_ti
 
 int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
     int64_t sum_time = 0;
-    for (auto& [id, time] : _instance_to_rpc_time) {
-        sum_time += time;
+    for (auto& [id, stats] : _instance_to_rpc_stats) {
+        sum_time += stats->sum_time;
     }
     return sum_time;
 }
 
-void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
-                                      int64_t receive_rpc_time) {
+void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t 
start_rpc_time,
+                                         int64_t receive_rpc_time) {
     _rpc_count++;
     int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
-    DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
+    DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
     if (rpc_spend_time > 0) {
-        _instance_to_rpc_time[id] += rpc_spend_time;
+        ++_instance_to_rpc_stats[id]->rpc_count;
+        _instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
+        _instance_to_rpc_stats[id]->max_time =
+                std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
+        _instance_to_rpc_stats[id]->min_time =
+                std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
     }
 }
 
 void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
-    auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
+    auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
     auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
     auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
-    auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
+    auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", 
TUnit::UNIT, 1);
     auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
 
     int64_t max_rpc_time = 0, min_rpc_time = 0;
@@ -510,6 +523,38 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* 
profile) {
     int64_t sum_time = get_sum_rpc_time();
     _sum_rpc_timer->set(sum_time);
     _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), 
_rpc_count.load()));
+
+    auto max_count = _state->rpc_verbose_profile_max_instance_count();
+    if (_state->enable_verbose_profile() && max_count > 0) {
+        std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec;
+        for (const auto& stats : _instance_to_rpc_stats_vec) {
+            tmp_rpc_stats_vec.emplace_back(*stats);
+        }
+        pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
+                [](const auto& a, const auto& b) { return a.max_time > 
b.max_time; });
+        auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
+        int i = 0;
+        auto* detail_profile = profile->create_child("RpcInstanceDetails", 
true, true);
+        for (const auto& stats : tmp_rpc_stats_vec) {
+            if (0 == stats.rpc_count) {
+                continue;
+            }
+            std::stringstream out;
+            out << "Instance " << std::hex << stats.inst_lo_id;
+            auto stats_str = fmt::format(
+                    "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, 
SumTime: {}",
+                    stats.rpc_count, PrettyPrinter::print(stats.max_time, 
TUnit::TIME_NS),
+                    PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
+                    PrettyPrinter::print(
+                            stats.sum_time / std::max(static_cast<int64_t>(1), 
stats.rpc_count),
+                            TUnit::TIME_NS),
+                    PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
+            detail_profile->add_info_string(out.str(), stats_str);
+            if (++i == count) {
+                break;
+            }
+        }
+    }
 }
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 13692532a33..22a1452f8d5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -180,7 +180,7 @@ public:
     Status add_block(TransmitInfo&& request);
     Status add_block(BroadcastTransmitInfo&& request);
     void close();
-    void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
+    void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
     void set_dependency(std::shared_ptr<Dependency> queue_dependency,
@@ -215,7 +215,16 @@ private:
     phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
 
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
-    phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
+    struct RpcInstanceStatistics {
+        RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
+        InstanceLoId inst_lo_id;
+        int64_t rpc_count = 0;
+        int64_t max_time = 0;
+        int64_t min_time = INT64_MAX;
+        int64_t sum_time = 0;
+    };
+    std::vector<std::shared_ptr<RpcInstanceStatistics>> 
_instance_to_rpc_stats_vec;
+    phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> 
_instance_to_rpc_stats;
 
     std::atomic<bool> _is_finishing;
     PUniqueId _query_id;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index eafefa2e4c0..dbde9abd05d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -81,6 +81,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     get_data_from_recvr_timer = ADD_TIMER(_runtime_profile, 
"GetDataFromRecvrTime");
     filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
     create_merger_timer = ADD_TIMER(_runtime_profile, "CreateMergerTime");
+    _runtime_profile->add_info_string("InstanceID", 
print_id(state->fragment_instance_id()));
 
     return Status::OK();
 }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index abc823bc25b..88deee491d1 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -459,6 +459,17 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    bool enable_verbose_profile() const {
+        return enable_profile() && 
_query_options.__isset.enable_verbose_profile &&
+               _query_options.enable_verbose_profile;
+    }
+
+    int rpc_verbose_profile_max_instance_count() const {
+        return _query_options.__isset.rpc_verbose_profile_max_instance_count
+                       ? _query_options.rpc_verbose_profile_max_instance_count
+                       : 0;
+    }
+
     bool enable_share_hash_table_for_broadcast_join() const {
         return 
_query_options.__isset.enable_share_hash_table_for_broadcast_join &&
                _query_options.enable_share_hash_table_for_broadcast_join;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index dfbe4c445a4..4b1049649b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -97,6 +97,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String MAX_EXECUTION_TIME = "max_execution_time";
     public static final String INSERT_TIMEOUT = "insert_timeout";
     public static final String ENABLE_PROFILE = "enable_profile";
+    public static final String ENABLE_VERBOSE_PROFILE = 
"enable_verbose_profile";
+    public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = 
"rpc_verbose_profile_max_instance_count";
     public static final String AUTO_PROFILE_THRESHOLD_MS = 
"auto_profile_threshold_ms";
     public static final String SQL_MODE = "sql_mode";
     public static final String WORKLOAD_VARIABLE = "workload_group";
@@ -790,6 +792,12 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true)
+    public boolean enableVerboseProfile = false;
+
+    @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, 
needForward = true)
+    public int rpcVerboseProfileMaxInstanceCount = 5;
+
     // When enable_profile is true, profile of queries that costs more than 
autoProfileThresholdMs
     // will be stored to disk.
     @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
@@ -3811,6 +3819,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setQueryTimeout(queryTimeoutS);
         tResult.setEnableProfile(enableProfile);
+        tResult.setEnableVerboseProfile(enableVerboseProfile);
+        
tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount);
         if (enableProfile) {
             // If enable profile == true, then also set report success to true
             // be need report success to start report thread. But it is very 
tricky


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to