zhiqiang-hhhh commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1716804092
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -133,6 +138,114 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::map<int64_t, std::unordered_set<TUniqueId>> result; + std::vector<FrontendInfo> qualified_fes; + + for (const auto& fe : running_fes) { + // Only consider normal frontend. + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.second); + } else { + return std::map<int64_t, std::unordered_set<TUniqueId>>(); + } + } + + auto fetch_running_queries_rpc = [](const FrontendInfo& fe_info) { + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_info.info.coordinator_address, &client_status); + // Abort this fe. + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), + false); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + // During upgrading cluster or meet any other network error. + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + // Avoid logic error in frontend. + if (rpc_result.__isset.status == false || + rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); Review Comment: 这个是执行 rpc 的 future 的返回值 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org