yiguolei commented on code in PR #39852: URL: https://github.com/apache/doris/pull/39852#discussion_r1729634715
########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -240,6 +243,9 @@ Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req template <typename Parent> Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { + if (_parent) { + SCOPED_TIMER(_parent->brpc_send_timer()); Review Comment: 这么写没意义, scoped 就是在这个 {} 有用 ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -154,7 +155,8 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() { } template <typename Parent> -void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) { +void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id, + vectorized::PipChannel<Parent>* channel) { Review Comment: 增加这个channel 变量的目的是? ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -538,6 +555,36 @@ void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); + + if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) { + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + pdqsort(_instance_to_rpc_stats_vec.begin(), _instance_to_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a->max_time > b->max_time; }); + auto count = std::min((size_t)max_count, _instance_to_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); + for (const auto& stats : _instance_to_rpc_stats_vec) { + if (0 == stats->rpc_count) { + continue; + } + std::stringstream out; + out << "Instance " << std::hex << stats->inst_lo_id; + auto stats_str = fmt::format( + "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", + stats->rpc_count, PrettyPrinter::print(stats->max_time, TUnit::TIME_NS), Review Comment: CloseTime: avg 11.310us, max 11.310us, min 11.310us - ExecTime: avg 150.158us, max 150.158us, min 150.158us - InitTime: avg 72.193us, max 72.193us, min 72.193us ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -298,7 +304,10 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - set_rpc_time(id, start_rpc_time, result.receive_time()); Review Comment: 继续保留这个,我们不动以前的逻辑,这次我们只是增加 channel 级别的detail的信息 ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -538,6 +555,36 @@ void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) { int64_t sum_time = get_sum_rpc_time(); _sum_rpc_timer->set(sum_time); _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load())); + + if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) { + auto max_count = _state->rpc_verbose_profile_max_instance_count(); + if (_state->enable_verbose_profile() && max_count > 0) { + pdqsort(_instance_to_rpc_stats_vec.begin(), _instance_to_rpc_stats_vec.end(), + [](const auto& a, const auto& b) { return a->max_time > b->max_time; }); + auto count = std::min((size_t)max_count, _instance_to_rpc_stats_vec.size()); + int i = 0; + auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); Review Comment: 这个profile 对象得检查下是否为nullptr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org