yiguolei commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1715104691
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -133,6 +136,94 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static bool _get_all_running_queries(std::unordered_set<TUniqueId>& result_ref) { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::vector<TNetworkAddress> qualified_fes; + + for (const auto& fe : running_fes) { + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.first); + } + } + + auto fetch_running_queries_rpc = [](const TNetworkAddress& fe_addr) { + std::unordered_set<TUniqueId> running_queries; + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_addr, &client_status); + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_addr), client_status.to_string()); + return std::make_pair(running_queries, false); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return std::make_pair(running_queries, false); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_addr), e.what()); + return std::make_pair(running_queries, false); + } + + if (rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_addr), + doris::to_string(rpc_result.status.status_code)); + return std::make_pair(running_queries, false); + } + + for (const auto& query_id : rpc_result.running_queries) { + running_queries.insert(query_id); + } + + return std::make_pair(running_queries, true); + }; + + std::vector<std::future<std::pair<std::unordered_set<TUniqueId>, bool>>> futures; + + for (const auto& fe_addr : qualified_fes) { + // Create another thread to fetch running queries + futures.push_back(std::async(std::launch::async, fetch_running_queries_rpc, fe_addr)); + } + + for (auto& future : futures) { + // wait_for at most 3 seconds + auto future_status = future.wait_for(std::chrono::seconds(3)); + if (future_status != std::future_status::ready) { + LOG_WARNING("Fetch running queries from frontend timeout"); + continue; Review Comment: 这里为啥是continue? 而不是报错?return false -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org