zhiqiang-hhhh commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1716805605
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -133,6 +138,114 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::map<int64_t, std::unordered_set<TUniqueId>> result; + std::vector<FrontendInfo> qualified_fes; + + for (const auto& fe : running_fes) { + // Only consider normal frontend. + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.second); + } else { + return std::map<int64_t, std::unordered_set<TUniqueId>>(); + } + } + + auto fetch_running_queries_rpc = [](const FrontendInfo& fe_info) { + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_info.info.coordinator_address, &client_status); + // Abort this fe. + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), + false); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + // During upgrading cluster or meet any other network error. + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what()); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + // Avoid logic error in frontend. + if (rpc_result.__isset.status == false || + rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + if (rpc_result.__isset.running_queries == false) { + return std::make_tuple(std::make_tuple(0L, std::unordered_set<TUniqueId>()), false); + } + + return std::make_tuple( + std::make_tuple(fe_info.info.process_uuid, + std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(), + rpc_result.running_queries.end())), + true); + }; + + std::vector<std::future<std::tuple<std::tuple<int64_t, std::unordered_set<TUniqueId>>, bool>>> + futures; + + for (const auto& fe_addr : qualified_fes) { + // Create another thread to fetch running queries + futures.push_back(std::async(std::launch::async, fetch_running_queries_rpc, fe_addr)); + } + + for (auto& future : futures) { + // wait_for at most 3 seconds + auto future_status = future.wait_for(std::chrono::seconds(3)); + if (future_status != std::future_status::ready) { + LOG_WARNING("Fetch running queries from frontend timeout"); + // Empty result, cancel worker will not do anything + return {}; Review Comment: 这个是 _get_all_running_queries_from_fe 函数的返回值,返回空,153 行的应该改成 `retrun {}` 就一致了 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org