This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 736f1ad53d2 [profile](rpc) add RpcInstanceDetails profile to debug rpc consumption problem (#43284) 736f1ad53d2 is described below commit 736f1ad53d28bb75ee902c1885f255ddce7d0acf Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Tue Nov 12 14:17:00 2024 +0800 [profile](rpc) add RpcInstanceDetails profile to debug rpc consumption problem (#43284) Picked from the 2.1 branch, only the RPC profile-related code was selected. https://github.com/apache/doris/pull/39852 https://github.com/apache/doris/pull/40117 ``` DATA_STREAM_SINK_OPERATOR (id=2,dst_id=2): - RpcCount: sum 16, avg 4, max 4, min 4 - RpcMaxTime: avg 1.15ms, max 1.163ms, min 818.493us - RpcAvgTime: 11.850ms - RpcCount: 10 - RpcMaxTime: 86.891ms - RpcMinTime: 15.200ms - RpcSumTime: 118.503ms - SerializeBatchTime: 13.517ms - SplitBlockDistributeByChannelTime: 38.923ms - SplitBlockHashComputeTime: 2.659ms - UncompressedRowBatchSize: 135.19 KB - WaitForDependencyTime: 0ns - WaitForRpcBufferQueue: 0ns RpcInstanceDetails: - Instance 85d4f75b72a9ea61: Count: 4, MaxTime: 36.238ms, MinTime: 12.107ms, AvgTime: 21.722ms, SumTime: 86.891ms - Instance 85d4f75b72a9ea91: Count: 3, MaxTime: 11.107ms, MinTime: 2.431ms, AvgTime: 5.470ms, SumTime: 16.412ms - Instance 85d4f75b72a9eac1: Count: 3, MaxTime: 7.554ms, MinTime: 3.160ms, AvgTime: 5.066ms, SumTime: 15.200m ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 75 +++++++++++++++++----- be/src/pipeline/exec/exchange_sink_buffer.h | 13 +++- be/src/pipeline/exec/exchange_source_operator.cpp | 1 + be/src/runtime/runtime_state.h | 11 ++++ .../java/org/apache/doris/qe/SessionVariable.java | 10 +++ 5 files changed, 93 insertions(+), 17 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 7163299d766..0f02ffc2b9a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -25,6 +25,7 @@ #include <gen_cpp/types.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> +#include <pdqsort.h> #include <stddef.h> #include <atomic> @@ -129,7 +130,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; _instance_to_receiver_eof[low_id] = false; - _instance_to_rpc_time[low_id] = 0; + _instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id)); + _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); _construct_request(low_id, finst_id); } @@ -261,7 +263,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { _set_receiver_eof(id); @@ -339,7 +344,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); + + auto end_rpc_time = GetCurrentTimeNanos(); + update_rpc_time(id, start_rpc_time, end_rpc_time); + Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { _set_receiver_eof(id); @@ -466,10 +474,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; - for (auto& [id, time] : _instance_to_rpc_time) { - if (time != 0) { - local_max_time = std::max(local_max_time, time); - local_min_time = std::min(local_min_time, time); + for (auto& [id, stats] : _instance_to_rpc_stats) { + if (stats->sum_time != 0) { + local_max_time = std::max(local_max_time, stats->sum_time); + local_min_time = std::min(local_min_time, stats->sum_time); } } *max_time = local_max_time; @@ -478,27 +486,32 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti int64_t ExchangeSinkBuffer::get_sum_rpc_time() { int64_t sum_time = 0; - for (auto& [id, time] : _instance_to_rpc_time) { - sum_time += time; + for (auto& [id, stats] : _instance_to_rpc_stats) { + sum_time += stats->sum_time; } return sum_time; } -void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, - int64_t receive_rpc_time) { +void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t start_rpc_time, + int64_t receive_rpc_time) { _rpc_count++; int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; - DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end()); + DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end()); if (rpc_spend_time > 0) { - _instance_to_rpc_time[id] += rpc_spend_time; + ++_instance_to_rpc_stats[id]->rpc_count; + _instance_to_rpc_stats[id]->sum_time += rpc_spend_time; + _instance_to_rpc_stats[id]->max_time = + std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time); + _instance_to_rpc_stats[id]->min_time = + std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time); } } void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { - auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime"); + auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); - auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); + auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1); auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); int64_t max_rpc_time = 0, min_rpc_time = 0; @@ -510,6 +523,38 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); + + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec; + for (const auto& stats : _instance_to_rpc_stats_vec) { + tmp_rpc_stats_vec.emplace_back(*stats); + } + pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a.max_time > b.max_time; }); + auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); + for (const auto& stats : tmp_rpc_stats_vec) { + if (0 == stats.rpc_count) { + continue; + } + std::stringstream out; + out << "Instance " << std::hex << stats.inst_lo_id; + auto stats_str = fmt::format( + "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", + stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS), + PrettyPrinter::print(stats.min_time, TUnit::TIME_NS), + PrettyPrinter::print( + stats.sum_time / std::max(static_cast<int64_t>(1), stats.rpc_count), + TUnit::TIME_NS), + PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS)); + detail_profile->add_info_string(out.str(), stats_str); + if (++i == count) { + break; + } + } + } } } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 13692532a33..22a1452f8d5 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -180,7 +180,7 @@ public: Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); void close(); - void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr<Dependency> queue_dependency, @@ -215,7 +215,16 @@ private: phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle; phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof; - phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time; + struct RpcInstanceStatistics { + RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} + InstanceLoId inst_lo_id; + int64_t rpc_count = 0; + int64_t max_time = 0; + int64_t min_time = INT64_MAX; + int64_t sum_time = 0; + }; + std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec; + phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats; std::atomic<bool> _is_finishing; PUniqueId _query_id; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index eafefa2e4c0..dbde9abd05d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -81,6 +81,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { get_data_from_recvr_timer = ADD_TIMER(_runtime_profile, "GetDataFromRecvrTime"); filter_timer = ADD_TIMER(_runtime_profile, "FilterTime"); create_merger_timer = ADD_TIMER(_runtime_profile, "CreateMergerTime"); + _runtime_profile->add_info_string("InstanceID", print_id(state->fragment_instance_id())); return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index abc823bc25b..88deee491d1 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -459,6 +459,17 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + bool enable_verbose_profile() const { + return enable_profile() && _query_options.__isset.enable_verbose_profile && + _query_options.enable_verbose_profile; + } + + int rpc_verbose_profile_max_instance_count() const { + return _query_options.__isset.rpc_verbose_profile_max_instance_count + ? _query_options.rpc_verbose_profile_max_instance_count + : 0; + } + bool enable_share_hash_table_for_broadcast_join() const { return _query_options.__isset.enable_share_hash_table_for_broadcast_join && _query_options.enable_share_hash_table_for_broadcast_join; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index dfbe4c445a4..4b1049649b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -97,6 +97,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_EXECUTION_TIME = "max_execution_time"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; + public static final String ENABLE_VERBOSE_PROFILE = "enable_verbose_profile"; + public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT = "rpc_verbose_profile_max_instance_count"; public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms"; public static final String SQL_MODE = "sql_mode"; public static final String WORKLOAD_VARIABLE = "workload_group"; @@ -790,6 +792,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true) + public boolean enableVerboseProfile = false; + + @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT, needForward = true) + public int rpcVerboseProfileMaxInstanceCount = 5; + // When enable_profile is true, profile of queries that costs more than autoProfileThresholdMs // will be stored to disk. @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true) @@ -3811,6 +3819,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setQueryTimeout(queryTimeoutS); tResult.setEnableProfile(enableProfile); + tResult.setEnableVerboseProfile(enableVerboseProfile); + tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount); if (enableProfile) { // If enable profile == true, then also set report success to true // be need report success to start report thread. But it is very tricky --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org