yiguolei commented on code in PR #39223: URL: https://github.com/apache/doris/pull/39223#discussion_r1715123675
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -878,55 +990,94 @@ void FragmentMgr::cancel_worker() { "starting? " << "We will not cancel any outdated queries in this situation."; } else { - for (const auto& it : _query_ctx_map) { - if (auto q_ctx = it.second.lock()) { - if (q_ctx->get_fe_process_uuid() == 0) { - // zero means this query is from a older version fe or - // this fe is starting - continue; - } - - auto itr = running_fes.find(q_ctx->coord_addr); - if (itr != running_fes.end()) { - if (q_ctx->get_fe_process_uuid() == itr->second.info.process_uuid || - itr->second.info.process_uuid == 0) { - continue; - } else { - LOG_WARNING( - "Coordinator of query {} restarted, going to cancel it.", - print_id(q_ctx->query_id())); - } - } else { - // In some rear cases, the rpc port of follower is not updated in time, - // then the port of this follower will be zero, but acutally it is still running, - // and be has already received the query from follower. - // So we need to check if host is in running_fes. - bool fe_host_is_standing = std::any_of( - running_fes.begin(), running_fes.end(), - [&q_ctx](const auto& fe) { - return fe.first.hostname == q_ctx->coord_addr.hostname && - fe.first.port == 0; - }); - if (fe_host_is_standing) { - LOG_WARNING( - "Coordinator {}:{} is not found, but its host is still " - "running with an unstable brpc port, not going to cancel " - "it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); - continue; - } else { - LOG_WARNING( - "Could not find target coordinator {}:{} of query {}, " - "going to " - "cancel it.", - q_ctx->coord_addr.hostname, q_ctx->coord_addr.port, - print_id(q_ctx->query_id())); - } - } + std::set<std::string> white_list_fe_hosts; + std::unordered_set<TNetworkAddress> white_list_fe_addrs; + + for (const auto& fe : running_fes) { + if (fe.second.info.process_uuid == 0 || + fe.second.info.coordinator_address.port == 0) { + white_list_fe_hosts.insert(fe.first.hostname); } - // Coordinator of this query has already dead or query context has been released. - queries_lost_coordinator.push_back(it.first); + } + + std::unordered_set<TUniqueId> white_list_queries; + + for (auto itr : _query_ctx_map) { + auto q_ctx = itr.second.lock(); + if (q_ctx == nullptr) { + continue; + } + + // 0 means this query: + // 1. is from a older version fe + // 2. the fe is starting, hb has not come yet + // 3. this query does not have coordinator at all (eg. streamload, spark connector) + if (q_ctx->get_fe_process_uuid() == 0) { + white_list_queries.insert(q_ctx->query_id()); + } + } + + for (const auto& pair : _query_ctx_map) { + std::shared_ptr<QueryContext> q_ctx = pair.second.lock(); + + if (q_ctx == nullptr) { + continue; + } + + // If query is in white list, do not cancel it. + if (white_list_queries.contains(q_ctx->query_id())) { + continue; + } + + // If query is from an unstable fe, do not cancel it. + if (white_list_fe_hosts.contains(q_ctx->coord_addr.hostname)) { + continue; + } + + // If this query is running on any fes, do not cancel it. + if (running_queries_on_all_fes.contains(q_ctx->query_id())) { + continue; + } + + // If coordinator of the query does not exist + auto tmp_itr = running_fes.find(q_ctx->coord_addr); + if (tmp_itr == running_fes.end()) { + LOG_WARNING("Query {} coordinator {} does not exist, cancel it.", + print_id(pair.first), q_ctx->coord_addr.hostname); + queries_lost_coordinator.push_back(pair.first); + continue; + } + + // Coordinator is still standing, we need to check if it is same process + if (tmp_itr->second.info.process_uuid != q_ctx->get_fe_process_uuid()) { + LOG_WARNING("Query {} coordinator {} process restarted, cancel it.", + print_id(pair.first), q_ctx->coord_addr.hostname); + queries_lost_coordinator.push_back(pair.first); + continue; + } + + if (!do_pipeline_task_leak_check) { + // No runnting queries on all fes means we do not check pipeline task leakage in this + // iteration. + continue; + } + + if (q_ctx->get_query_arrival_timestamp().tv_nsec > + check_invalid_query_last_timestamp.tv_nsec) { + // Query arrived after last check, skip it. + continue; + } + + // Coordinator is still standing, and it is same process. + // But query does not exists in any frontends. + // Typically, this means this query is invalid, eg. we have some bugs in pipeline scheduler which + // makes the query can not be closed normally. + // We need to cancel these query to release resources. + LOG_ERROR( Review Comment: 打一下时间间隔,比如第一次检查,第二次检查 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org