yiguolei commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1717086419
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -133,6 +137,114 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static std::tuple<std::tuple<int64_t, std::unordered_set<TUniqueId>>, bool> +_do_fetch_running_queries_rpc(const FrontendInfo& fe_info) { + auto invalid_res = std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_info.info.coordinator_address, &client_status); + // Abort this fe. + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + return invalid_res; + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return invalid_res; + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + // During upgrading cluster or meet any other network error. + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what()); + return invalid_res; + } + + // Avoid logic error in frontend. + if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + return invalid_res; + } + + if (rpc_result.__isset.running_queries == false) { + return invalid_res; + } + + return std::make_tuple( + std::make_tuple(fe_info.info.process_uuid, + std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(), + rpc_result.running_queries.end())), + true); +}; + +static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::map<int64_t, std::unordered_set<TUniqueId>> result; + std::vector<FrontendInfo> qualified_fes; + + for (const auto& fe : running_fes) { + // Only consider normal frontend. + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.second); + } else { + return {}; + } + } + + std::vector<std::future<std::tuple<std::tuple<int64_t, std::unordered_set<TUniqueId>>, bool>>> + futures; + + for (const auto& fe_addr : qualified_fes) { Review Comment: 这里代码可读性不好,实在不行,就别future了,FE 也没多少个,我们这个操作也不要求时效性多高,直接顺序的一个个请求FE吧,代码可读性能好很多 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org