This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2dea859bdb1 [debug](rpc) debug rpc time consumption problem (#39852) 2dea859bdb1 is described below commit 2dea859bdb1ff627fb62f023ccd87427dd092d3b Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Sat Aug 24 19:59:39 2024 +0800 [debug](rpc) debug rpc time consumption problem (#39852) ## Proposed changes Issue Number: close #xxx Add detail RPC time info for each channel, sorted by max rpc time of channels: ``` DATA_STREAM_SINK_OPERATOR (id=1,dst_id=1): - Partitioner: Crc32HashPartitioner(64) - BlocksProduced: 74 - BrpcSendTime: 2.689us - BrpcSendTime.Wait: 0ns - BytesSent: 89.35 KB - CloseTime: 680.152us - CompressTime: 0ns - ExecTime: 160.663ms - InitTime: 263.608us - InputRows: 32.512K (32512) - LocalBytesSent: 0.00 - LocalSendTime: 0ns - LocalSentRows: 0 - MemoryUsage: - PeakMemoryUsage: 80.00 KB - MergeBlockTime: 0ns - OpenTime: 4.113ms - OverallThroughput: 0.0 /sec - PendingFinishDependency: 41.179ms - RowsProduced: 32.512K (32512) - RpcAvgTime: 11.850ms - RpcCount: 10 - RpcMaxTime: 86.891ms - RpcMinTime: 15.200ms - RpcSumTime: 118.503ms - SerializeBatchTime: 13.517ms - SplitBlockDistributeByChannelTime: 38.923ms - SplitBlockHashComputeTime: 2.659ms - UncompressedRowBatchSize: 135.19 KB - WaitForDependencyTime: 0ns - WaitForRpcBufferQueue: 0ns RpcInstanceDetails: - Instance 85d4f75b72a9ea61: Count: 4, MaxTime: 36.238ms, MinTime: 12.107ms, AvgTime: 21.722ms, SumTime: 86.891ms - Instance 85d4f75b72a9ea91: Count: 3, MaxTime: 11.107ms, MinTime: 2.431ms, AvgTime: 5.470ms, SumTime: 16.412ms - Instance 85d4f75b72a9eac1: Count: 3, MaxTime: 7.554ms, MinTime: 3.160ms, AvgTime: 5.066ms, SumTime: 15.200m ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 69 ++++++++++++++++++---- be/src/pipeline/exec/exchange_sink_buffer.h | 14 ++++- be/src/pipeline/exec/exchange_source_operator.cpp | 2 + be/src/runtime/runtime_state.h | 11 ++++ .../org/apache/doris/planner/OlapScanNode.java | 12 ++++ .../java/org/apache/doris/qe/SessionVariable.java | 34 +++++++++++ gensrc/thrift/PaloInternalService.thrift | 3 + 7 files changed, 130 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 39a6a59bd49..c4bb736c6f9 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -24,6 +24,7 @@ #include <gen_cpp/Types_types.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> +#include <pdqsort.h> #include <stddef.h> #include <atomic> @@ -176,7 +177,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) { _rpc_channel_is_idle[low_id] = true; _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; - _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id)); + _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); _construct_request(low_id, finst_id); } @@ -298,7 +300,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + if (_state->enable_verbose_profile()) { + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + } Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { _set_receiver_eof(id); @@ -376,7 +381,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + if (_state->enable_verbose_profile()) { + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + } Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { _set_receiver_eof(id); @@ -491,10 +499,10 @@ template <typename Parent> void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; - for (auto& [id, time] : _instance_to_rpc_time) { - if (time != 0) { - local_max_time = std::max(local_max_time, time); - local_min_time = std::min(local_min_time, time); + for (auto& [id, stats] : _instance_to_rpc_stats) { + if (stats->sum_time != 0) { + local_max_time = std::max(local_max_time, stats->sum_time); + local_min_time = std::min(local_min_time, stats->sum_time); } } *max_time = local_max_time; @@ -504,20 +512,25 @@ void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t template <typename Parent> int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() { int64_t sum_time = 0; - for (auto& [id, time] : _instance_to_rpc_time) { - sum_time += time; + for (auto& [id, stats] : _instance_to_rpc_stats) { + sum_time += stats->sum_time; } return sum_time; } template <typename Parent> -void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, - int64_t receive_rpc_time) { +void ExchangeSinkBuffer<Parent>::update_rpc_time(InstanceLoId id, int64_t start_rpc_time, + int64_t receive_rpc_time) { _rpc_count++; int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; - DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); + DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end()); if (rpc_spend_time > 0) { - _instance_to_rpc_time[id] += rpc_spend_time; + ++_instance_to_rpc_stats[id]->rpc_count; + _instance_to_rpc_stats[id]->sum_time += rpc_spend_time; + _instance_to_rpc_stats[id]->max_time = + std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time); + _instance_to_rpc_stats[id]->min_time = + std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time); } } @@ -538,6 +551,36 @@ void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); + + if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) { + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + pdqsort(_instance_to_rpc_stats_vec.begin(), _instance_to_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a->max_time > b->max_time; }); + auto count = std::min((size_t)max_count, _instance_to_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); + for (const auto& stats : _instance_to_rpc_stats_vec) { + if (0 == stats->rpc_count) { + continue; + } + std::stringstream out; + out << "Instance " << std::hex << stats->inst_lo_id; + auto stats_str = fmt::format( + "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", + stats->rpc_count, PrettyPrinter::print(stats->max_time, TUnit::TIME_NS), + PrettyPrinter::print(stats->min_time, TUnit::TIME_NS), + PrettyPrinter::print(stats->sum_time / std::max(static_cast<int64_t>(1), + stats->rpc_count), + TUnit::TIME_NS), + PrettyPrinter::print(stats->sum_time, TUnit::TIME_NS)); + detail_profile->add_info_string(out.str(), stats_str); + if (++i == count) { + break; + } + } + } + } } template class ExchangeSinkBuffer<vectorized::VDataStreamSender>; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index cd5502ee6d0..5ce6d75b149 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -25,6 +25,7 @@ #include <stdint.h> #include <atomic> +#include <cstdint> #include <list> #include <memory> #include <mutex> @@ -209,7 +210,7 @@ public: bool can_write() const; bool is_pending_finish(); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr<Dependency> queue_dependency, @@ -252,7 +253,16 @@ private: // Number of busy channels; std::atomic<int> _busy_channels = 0; phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof; - phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time; + struct RpcInstanceStatistics { + RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} + InstanceLoId inst_lo_id; + int64_t rpc_count = 0; + int64_t max_time = 0; + int64_t min_time = INT64_MAX; + int64_t sum_time = 0; + }; + std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec; + phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats; phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx; std::atomic<bool> _is_finishing; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 3f3ab736814..94cbe439a3e 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -88,6 +88,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { TUnit ::TIME_NS, timer_name, 1); } + _runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id())); + return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b88b29ee8d0..8b8cbd85f0f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -499,6 +499,17 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + bool enable_verbose_profile() const { + return enable_profile() && _query_options.__isset.enable_verbose_profile && + _query_options.enable_verbose_profile; + } + + int rpc_verbose_profile_max_instance_count() const { + return _query_options.__isset.rpc_verbose_profile_max_instance_count + ? _query_options.rpc_verbose_profile_max_instance_count + : 0; + } + bool enable_scan_node_run_serial() const { return _query_options.__isset.enable_scan_node_run_serial && _query_options.enable_scan_node_run_serial; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index d133d359b27..4ffa12e8f05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -741,6 +741,7 @@ public class OlapScanNode extends ScanNode { int useFixReplica = -1; boolean needCheckTags = false; boolean skipMissingVersion = false; + Set<Long> userSetBackendBlacklist = null; if (ConnectContext.get() != null) { allowedTags = ConnectContext.get().getResourceTags(); needCheckTags = ConnectContext.get().isResourceTagsSet(); @@ -751,6 +752,7 @@ public class OlapScanNode extends ScanNode { LOG.debug("query id: {}, partition id:{} visibleVersion: {}", DebugUtil.printId(ConnectContext.get().queryId()), partition.getId(), visibleVersion); } + userSetBackendBlacklist = ConnectContext.get().getSessionVariable().getQueryBackendBlacklist(); } for (Tablet tablet : tablets) { long tabletId = tablet.getId(); @@ -866,6 +868,16 @@ public class OlapScanNode extends ScanNode { + " does not exist or not alive"); continue; } + if (userSetBackendBlacklist != null && userSetBackendBlacklist.contains(backend.getId())) { + if (LOG.isDebugEnabled()) { + LOG.debug("backend {} is in the blacklist that user set in session variable {}", + replica.getBackendId(), replica.getId()); + } + String err = "replica " + replica.getId() + "'s backend " + replica.getBackendId() + + " in the blacklist that user set in session variable"; + errs.add(err); + continue; + } if (!backend.isMixNode()) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1b0c4bc6946..1a70e43cf77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -87,6 +87,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_EXECUTION_TIME = "max_execution_time"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; + public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile"; + public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count"; public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms"; public static final String SQL_MODE = "sql_mode"; public static final String WORKLOAD_VARIABLE = "workload_group"; @@ -415,6 +417,8 @@ public class SessionVariable implements Serializable, Writable { // fix replica to query. If num = 1, query the smallest replica, if 2 is the second smallest replica. public static final String USE_FIX_REPLICA = "use_fix_replica"; + public static final String QUERY_BACKEND_BLACKLIST = "query_backend_blacklist"; + public static final String DRY_RUN_QUERY = "dry_run_query"; // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. @@ -705,6 +709,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true) + public boolean enableVerboseProfile = true; + + @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true) + public int rpcVerboseProfileMaxInstanceCount = 5; + // if true, need report to coordinator when plan fragment execute successfully. @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true) public int autoProfileThresholdMs = -1; @@ -1475,6 +1485,13 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = USE_FIX_REPLICA) public int useFixReplica = -1; + // This is a debug feature, when we find a backend is not stable(for example network reasons) + // we could use this variable to exclude it from query plan. It is only used for query. Not for + // load jobs. + // Use could set multiple backendids using , to split like "10111,10112" + @VariableMgr.VarAttr(name = QUERY_BACKEND_BLACKLIST, needForward = true) + public String queryBackendBlacklist = ""; + @VariableMgr.VarAttr(name = DUMP_NEREIDS_MEMO) public boolean dumpNereidsMemo = false; @@ -3386,6 +3403,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setQueryTimeout(queryTimeoutS); tResult.setEnableProfile(enableProfile); + tResult.setEnableVerboseProfile(enableVerboseProfile); + tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount); if (enableProfile) { // If enable profile == true, then also set report success to true // be need report success to start report thread. But it is very tricky @@ -4050,6 +4069,21 @@ public class SessionVariable implements Serializable, Writable { this.ignoreStorageDataDistribution = ignoreStorageDataDistribution; } + // If anything wrong during parsing, just throw exception to forbidden the query + // so there is not many exception handling logic here. + public Set<Long> getQueryBackendBlacklist() { + Set<Long> blacklist = Sets.newHashSet(); + if (Strings.isNullOrEmpty(queryBackendBlacklist)) { + return blacklist; + } + String[] backendIds = this.queryBackendBlacklist.trim().split(","); + for (int i = 0; i < backendIds.length; ++i) { + long backendId = Long.parseLong(backendIds[i].trim()); + blacklist.add(backendId); + } + return blacklist; + } + public boolean isForceJniScanner() { return forceJniScanner; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 41b8fb8cf02..422b4baaff9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -322,6 +322,9 @@ struct TQueryOptions { 127: optional i32 runtime_bloom_filter_max_size = 16777216; + 128: optional bool enable_verbose_profile = false; + 129: optional i32 rpc_verbose_profile_max_instance_count = 0; + // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org