yiguolei commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1713607735
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -133,6 +136,95 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static bool _get_all_running_queries(std::unordered_set<TUniqueId>& result_ref) { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::vector<TNetworkAddress> qualified_fes; + + for (const auto& fe : running_fes) { + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.first); + } + } + + auto fetch_running_queries_rpc = [](const TNetworkAddress& fe_addr) { + std::unordered_set<TUniqueId> running_queries; + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_addr, &client_status); + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_addr), client_status.to_string()); + return std::make_pair(running_queries, false); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return std::make_pair(running_queries, false); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_addr), e.what()); + return std::make_pair(running_queries, false); + } + + if (rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_addr), + doris::to_string(rpc_result.status.status_code)); + return std::make_pair(running_queries, false); + } + + for (const auto& query_id : rpc_result.running_queries) { + running_queries.insert(query_id); + } + + return std::make_pair(running_queries, true); + }; + + std::vector<std::future<std::pair<std::unordered_set<TUniqueId>, bool>>> futures; + + for (const auto& fe_addr : qualified_fes) { + // Create another thread to fetch running queries + futures.push_back(std::async(std::launch::async, fetch_running_queries_rpc, fe_addr)); + } + + for (auto& future : futures) { + // wait_for at most 3 seconds + auto future_status = future.wait_for(std::chrono::seconds(3)); + if (future_status != std::future_status::ready) { + LOG_WARNING("Fetch running queries from frontend timeout"); + continue; + } + + const auto& query_ids_and_rpc_succeed = future.get(); + if (!query_ids_and_rpc_succeed.second) { + return false; + } + + for (const auto& query_id : query_ids_and_rpc_succeed.first) { + LOG_INFO("Running query id: {}", print_id(query_id)); + result_ref.insert(query_id); Review Comment: 可能不能这么写,如果一个fe fetch 失败,我们不能认为这个fe 上运行的query 是空的,此时应该认为都是合理的。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org