yiguolei commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1715120042
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -878,55 +990,94 @@ void FragmentMgr::cancel_worker() { "starting? " << "We will not cancel any outdated queries in this situation."; } else { - for (const auto& it : _query_ctx_map) { - if (auto q_ctx = it.second.lock()) { - if (q_ctx->get_fe_process_uuid() == 0) { - // zero means this query is from a older version fe or - // this fe is starting - continue; - } - - auto itr = running_fes.find(q_ctx->coord_addr); - if (itr != running_fes.end()) { - if (q_ctx->get_fe_process_uuid() == itr->second.info.process_uuid || - itr->second.info.process_uuid == 0) { - continue; - } else { - LOG_WARNING( - "Coordinator of query {} restarted, going to cancel it.", - print_id(q_ctx->query_id())); - } - } else { - // In some rear cases, the rpc port of follower is not updated in time, - // then the port of this follower will be zero, but acutally it is still running, - // and be has already received the query from follower. - // So we need to check if host is in running_fes. - bool fe_host_is_standing = std::any_of( - running_fes.begin(), running_fes.end(), - [&q_ctx](const auto& fe) { - return fe.first.hostname == q_ctx->coord_addr.hostname && - fe.first.port == 0; - }); - if (fe_host_is_standing) { - LOG_WARNING( - "Coordinator {}:{} is not found, but its host is still " - "running with an unstable brpc port, not going to cancel " - "it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); - continue; - } else { - LOG_WARNING( - "Could not find target coordinator {}:{} of query {}, " - "going to " - "cancel it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); - } - } + std::set<std::string> white_list_fe_hosts; + std::unordered_set<TNetworkAddress> white_list_fe_addrs; + + for (const auto& fe : running_fes) { + if (fe.second.info.process_uuid == 0 || + fe.second.info.coordinator_address.port == 0) { + white_list_fe_hosts.insert(fe.first.hostname); } - // Coordinator of this query has already dead or query context has been released. - queries_lost_coordinator.push_back(it.first); + } + + std::unordered_set<TUniqueId> white_list_queries; + + for (auto itr : _query_ctx_map) { + auto q_ctx = itr.second.lock(); + if (q_ctx == nullptr) { + continue; + } + + // 0 means this query: + // 1. is from a older version fe + // 2. the fe is starting, hb has not come yet + // 3. this query does not have coordinator at all (eg. streamload, spark connector) + if (q_ctx->get_fe_process_uuid() == 0) { + white_list_queries.insert(q_ctx->query_id()); Review Comment: 不用这个, 按道理说,如果get running queries 返回的是map,那么只要这个query的fe uid 不在这个map,那么就应该忽略 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org