This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 2dea859bdb1 [debug](rpc) debug rpc time consumption problem (#39852)
2dea859bdb1 is described below

commit 2dea859bdb1ff627fb62f023ccd87427dd092d3b
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Sat Aug 24 19:59:39 2024 +0800

    [debug](rpc) debug rpc time consumption problem (#39852)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    Add detail RPC time info for each channel, sorted by max rpc time of
    channels:
    ```
                         DATA_STREAM_SINK_OPERATOR  (id=1,dst_id=1):
                              -  Partitioner:  Crc32HashPartitioner(64)
                              -  BlocksProduced:  74
                              -  BrpcSendTime:  2.689us
                              -  BrpcSendTime.Wait:  0ns
                              -  BytesSent:  89.35  KB
                              -  CloseTime:  680.152us
                              -  CompressTime:  0ns
                              -  ExecTime:  160.663ms
                              -  InitTime:  263.608us
                              -  InputRows:  32.512K  (32512)
                              -  LocalBytesSent:  0.00
                              -  LocalSendTime:  0ns
                              -  LocalSentRows:  0
                              -  MemoryUsage:
                                  -  PeakMemoryUsage:  80.00  KB
                              -  MergeBlockTime:  0ns
                              -  OpenTime:  4.113ms
                              -  OverallThroughput:  0.0  /sec
                              -  PendingFinishDependency:  41.179ms
                              -  RowsProduced:  32.512K  (32512)
                              -  RpcAvgTime:  11.850ms
                              -  RpcCount:  10
                              -  RpcMaxTime:  86.891ms
                              -  RpcMinTime:  15.200ms
                              -  RpcSumTime:  118.503ms
                              -  SerializeBatchTime:  13.517ms
                              -  SplitBlockDistributeByChannelTime:  38.923ms
                              -  SplitBlockHashComputeTime:  2.659ms
                              -  UncompressedRowBatchSize:  135.19  KB
                              -  WaitForDependencyTime:  0ns
                                  -  WaitForRpcBufferQueue:  0ns
                            RpcInstanceDetails:
                                  -  Instance  85d4f75b72a9ea61:  Count:  4,  
MaxTime:  36.238ms,  MinTime:  12.107ms,  AvgTime:  21.722ms,  SumTime:  
86.891ms
                                  -  Instance  85d4f75b72a9ea91:  Count:  3,  
MaxTime:  11.107ms,  MinTime:  2.431ms,  AvgTime:  5.470ms,  SumTime:  16.412ms
                                  -  Instance  85d4f75b72a9eac1:  Count:  3,  
MaxTime:  7.554ms,  MinTime:  3.160ms,  AvgTime:  5.066ms,  SumTime:  15.200m
    ```
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 69 ++++++++++++++++++----
 be/src/pipeline/exec/exchange_sink_buffer.h        | 14 ++++-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  2 +
 be/src/runtime/runtime_state.h                     | 11 ++++
 .../org/apache/doris/planner/OlapScanNode.java     | 12 ++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 34 +++++++++++
 gensrc/thrift/PaloInternalService.thrift           |  3 +
 7 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 39a6a59bd49..c4bb736c6f9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -24,6 +24,7 @@
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
+#include <pdqsort.h>
 #include <stddef.h>
 
 #include <atomic>
@@ -176,7 +177,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId 
fragment_instance_id) {
     _rpc_channel_is_idle[low_id] = true;
     _instance_to_rpc_ctx[low_id] = {};
     _instance_to_receiver_eof[low_id] = false;
-    _instance_to_rpc_time[low_id] = 0;
+    
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
+    _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
     _construct_request(low_id, finst_id);
 }
 
@@ -298,7 +300,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            set_rpc_time(id, start_rpc_time, result.receive_time());
+            if (_state->enable_verbose_profile()) {
+                auto end_rpc_time = GetCurrentTimeNanos();
+                update_rpc_time(id, start_rpc_time, end_rpc_time);
+            }
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
                 _set_receiver_eof(id);
@@ -376,7 +381,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            set_rpc_time(id, start_rpc_time, result.receive_time());
+            if (_state->enable_verbose_profile()) {
+                auto end_rpc_time = GetCurrentTimeNanos();
+                update_rpc_time(id, start_rpc_time, end_rpc_time);
+            }
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
                 _set_receiver_eof(id);
@@ -491,10 +499,10 @@ template <typename Parent>
 void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, 
int64_t* min_time) {
     int64_t local_max_time = 0;
     int64_t local_min_time = INT64_MAX;
-    for (auto& [id, time] : _instance_to_rpc_time) {
-        if (time != 0) {
-            local_max_time = std::max(local_max_time, time);
-            local_min_time = std::min(local_min_time, time);
+    for (auto& [id, stats] : _instance_to_rpc_stats) {
+        if (stats->sum_time != 0) {
+            local_max_time = std::max(local_max_time, stats->sum_time);
+            local_min_time = std::min(local_min_time, stats->sum_time);
         }
     }
     *max_time = local_max_time;
@@ -504,20 +512,25 @@ void 
ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t
 template <typename Parent>
 int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
     int64_t sum_time = 0;
-    for (auto& [id, time] : _instance_to_rpc_time) {
-        sum_time += time;
+    for (auto& [id, stats] : _instance_to_rpc_stats) {
+        sum_time += stats->sum_time;
     }
     return sum_time;
 }
 
 template <typename Parent>
-void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId id, int64_t 
start_rpc_time,
-                                              int64_t receive_rpc_time) {
+void ExchangeSinkBuffer<Parent>::update_rpc_time(InstanceLoId id, int64_t 
start_rpc_time,
+                                                 int64_t receive_rpc_time) {
     _rpc_count++;
     int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
-    DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
+    DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
     if (rpc_spend_time > 0) {
-        _instance_to_rpc_time[id] += rpc_spend_time;
+        ++_instance_to_rpc_stats[id]->rpc_count;
+        _instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
+        _instance_to_rpc_stats[id]->max_time =
+                std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
+        _instance_to_rpc_stats[id]->min_time =
+                std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
     }
 }
 
@@ -538,6 +551,36 @@ void 
ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
     int64_t sum_time = get_sum_rpc_time();
     _sum_rpc_timer->set(sum_time);
     _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), 
_rpc_count.load()));
+
+    if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
+        auto max_count = _state->rpc_verbose_profile_max_instance_count();
+        if (_state->enable_verbose_profile() && max_count > 0) {
+            pdqsort(_instance_to_rpc_stats_vec.begin(), 
_instance_to_rpc_stats_vec.end(),
+                    [](const auto& a, const auto& b) { return a->max_time > 
b->max_time; });
+            auto count = std::min((size_t)max_count, 
_instance_to_rpc_stats_vec.size());
+            int i = 0;
+            auto* detail_profile = profile->create_child("RpcInstanceDetails", 
true, true);
+            for (const auto& stats : _instance_to_rpc_stats_vec) {
+                if (0 == stats->rpc_count) {
+                    continue;
+                }
+                std::stringstream out;
+                out << "Instance " << std::hex << stats->inst_lo_id;
+                auto stats_str = fmt::format(
+                        "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, 
SumTime: {}",
+                        stats->rpc_count, 
PrettyPrinter::print(stats->max_time, TUnit::TIME_NS),
+                        PrettyPrinter::print(stats->min_time, TUnit::TIME_NS),
+                        PrettyPrinter::print(stats->sum_time / 
std::max(static_cast<int64_t>(1),
+                                                                        
stats->rpc_count),
+                                             TUnit::TIME_NS),
+                        PrettyPrinter::print(stats->sum_time, TUnit::TIME_NS));
+                detail_profile->add_info_string(out.str(), stats_str);
+                if (++i == count) {
+                    break;
+                }
+            }
+        }
+    }
 }
 
 template class ExchangeSinkBuffer<vectorized::VDataStreamSender>;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index cd5502ee6d0..5ce6d75b149 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -25,6 +25,7 @@
 #include <stdint.h>
 
 #include <atomic>
+#include <cstdint>
 #include <list>
 #include <memory>
 #include <mutex>
@@ -209,7 +210,7 @@ public:
     bool can_write() const;
     bool is_pending_finish();
     void close();
-    void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
+    void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
     void set_dependency(std::shared_ptr<Dependency> queue_dependency,
@@ -252,7 +253,16 @@ private:
     // Number of busy channels;
     std::atomic<int> _busy_channels = 0;
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
-    phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
+    struct RpcInstanceStatistics {
+        RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
+        InstanceLoId inst_lo_id;
+        int64_t rpc_count = 0;
+        int64_t max_time = 0;
+        int64_t min_time = INT64_MAX;
+        int64_t sum_time = 0;
+    };
+    std::vector<std::shared_ptr<RpcInstanceStatistics>> 
_instance_to_rpc_stats_vec;
+    phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> 
_instance_to_rpc_stats;
     phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> 
_instance_to_rpc_ctx;
 
     std::atomic<bool> _is_finishing;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3f3ab736814..94cbe439a3e 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -88,6 +88,8 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                                                            TUnit ::TIME_NS, 
timer_name, 1);
     }
 
+    _runtime_profile->add_info_string("InstanceID", 
print_id(state->fragment_instance_id()));
+
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index b88b29ee8d0..8b8cbd85f0f 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -499,6 +499,17 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    bool enable_verbose_profile() const {
+        return enable_profile() && 
_query_options.__isset.enable_verbose_profile &&
+               _query_options.enable_verbose_profile;
+    }
+
+    int rpc_verbose_profile_max_instance_count() const {
+        return _query_options.__isset.rpc_verbose_profile_max_instance_count
+                       ? _query_options.rpc_verbose_profile_max_instance_count
+                       : 0;
+    }
+
     bool enable_scan_node_run_serial() const {
         return _query_options.__isset.enable_scan_node_run_serial &&
                _query_options.enable_scan_node_run_serial;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d133d359b27..4ffa12e8f05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -741,6 +741,7 @@ public class OlapScanNode extends ScanNode {
         int useFixReplica = -1;
         boolean needCheckTags = false;
         boolean skipMissingVersion = false;
+        Set<Long> userSetBackendBlacklist = null;
         if (ConnectContext.get() != null) {
             allowedTags = ConnectContext.get().getResourceTags();
             needCheckTags = ConnectContext.get().isResourceTagsSet();
@@ -751,6 +752,7 @@ public class OlapScanNode extends ScanNode {
                 LOG.debug("query id: {}, partition id:{} visibleVersion: {}",
                         DebugUtil.printId(ConnectContext.get().queryId()), 
partition.getId(), visibleVersion);
             }
+            userSetBackendBlacklist = 
ConnectContext.get().getSessionVariable().getQueryBackendBlacklist();
         }
         for (Tablet tablet : tablets) {
             long tabletId = tablet.getId();
@@ -866,6 +868,16 @@ public class OlapScanNode extends ScanNode {
                             + " does not exist or not alive");
                     continue;
                 }
+                if (userSetBackendBlacklist != null && 
userSetBackendBlacklist.contains(backend.getId())) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("backend {} is in the blacklist that user 
set in session variable {}",
+                                replica.getBackendId(), replica.getId());
+                    }
+                    String err = "replica " + replica.getId() + "'s backend " 
+ replica.getBackendId()
+                            + " in the blacklist that user set in session 
variable";
+                    errs.add(err);
+                    continue;
+                }
                 if (!backend.isMixNode()) {
                     continue;
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 1b0c4bc6946..1a70e43cf77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -87,6 +87,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String MAX_EXECUTION_TIME = "max_execution_time";
     public static final String INSERT_TIMEOUT = "insert_timeout";
     public static final String ENABLE_PROFILE = "enable_profile";
+    public static final String ENABLE_VERBOSE_PROFILE = 
"enable_verbose_profile";
+    public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = 
"rpc_verbose_profile_max_instance_count";
     public static final String AUTO_PROFILE_THRESHOLD_MS = 
"auto_profile_threshold_ms";
     public static final String SQL_MODE = "sql_mode";
     public static final String WORKLOAD_VARIABLE = "workload_group";
@@ -415,6 +417,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // fix replica to query. If num = 1, query the smallest replica, if 2 is 
the second smallest replica.
     public static final String USE_FIX_REPLICA = "use_fix_replica";
 
+    public static final String QUERY_BACKEND_BLACKLIST = 
"query_backend_blacklist";
+
     public static final String DRY_RUN_QUERY = "dry_run_query";
 
     // Split size for ExternalFileScanNode. Default value 0 means use the 
block size of HDFS/S3.
@@ -705,6 +709,12 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true)
+    public boolean enableVerboseProfile = true;
+
+    @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, 
needForward = true)
+    public int rpcVerboseProfileMaxInstanceCount = 5;
+
     // if true, need report to coordinator when plan fragment execute 
successfully.
     @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
     public int autoProfileThresholdMs = -1;
@@ -1475,6 +1485,13 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = USE_FIX_REPLICA)
     public int useFixReplica = -1;
 
+    // This is a debug feature, when we find a backend is not stable(for 
example network reasons)
+    // we could use this variable to exclude it from query plan. It is only 
used for query. Not for
+    // load jobs.
+    // Use could set multiple backendids using , to split like "10111,10112"
+    @VariableMgr.VarAttr(name = QUERY_BACKEND_BLACKLIST, needForward = true)
+    public String queryBackendBlacklist = "";
+
     @VariableMgr.VarAttr(name = DUMP_NEREIDS_MEMO)
     public boolean dumpNereidsMemo = false;
 
@@ -3386,6 +3403,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setQueryTimeout(queryTimeoutS);
         tResult.setEnableProfile(enableProfile);
+        tResult.setEnableVerboseProfile(enableVerboseProfile);
+        
tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount);
         if (enableProfile) {
             // If enable profile == true, then also set report success to true
             // be need report success to start report thread. But it is very 
tricky
@@ -4050,6 +4069,21 @@ public class SessionVariable implements Serializable, 
Writable {
         this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
     }
 
+    // If anything wrong during parsing, just throw exception to forbidden the 
query
+    // so there is not many exception handling logic here.
+    public Set<Long> getQueryBackendBlacklist() {
+        Set<Long> blacklist = Sets.newHashSet();
+        if (Strings.isNullOrEmpty(queryBackendBlacklist)) {
+            return blacklist;
+        }
+        String[] backendIds = this.queryBackendBlacklist.trim().split(",");
+        for (int i = 0; i < backendIds.length; ++i) {
+            long backendId = Long.parseLong(backendIds[i].trim());
+            blacklist.add(backendId);
+        }
+        return blacklist;
+    }
+
     public boolean isForceJniScanner() {
         return forceJniScanner;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 41b8fb8cf02..422b4baaff9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -322,6 +322,9 @@ struct TQueryOptions {
 
   127: optional i32 runtime_bloom_filter_max_size = 16777216;
 
+  128: optional bool enable_verbose_profile = false;
+  129: optional i32 rpc_verbose_profile_max_instance_count = 0;
+
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to