This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a2fb08c5944 [Fix]Fix insert select missing audit log when connect follower FE (#36472) a2fb08c5944 is described below commit a2fb08c59448866a6518196ea0ce4924948e91c1 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jun 20 15:16:10 2024 +0800 [Fix]Fix insert select missing audit log when connect follower FE (#36472) ## Proposed changes pick #36454 Fix when a ```insert select``` is executed in Follower, audit log could missing query statistics. This is because ```audit log``` is logged in the connect FE, but request is forward to master FE, then the coord FE is master FE, BE report query statistics to cood FE, finally the connected Follower could not get reported query statistics, audit log missing query statistics. We can add a new field to mark client connected FE, then be report query statistics to the connected FE. Besides, I do refactor for FE's WorkloadRuntimeStatusMgr to make logic more clear and add some log in be.. --- be/src/runtime/fragment_mgr.cpp | 8 +- be/src/runtime/query_context.cpp | 18 +-- be/src/runtime/query_context.h | 4 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 34 ++++-- .../apache/doris/planner/StreamLoadPlanner.java | 1 + .../java/org/apache/doris/qe/ConnectContext.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++ .../WorkloadRuntimeStatusMgr.java | 125 ++++++++++----------- gensrc/thrift/PaloInternalService.thrift | 3 + 9 files changed, 130 insertions(+), 86 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1172b5b889b..9271f78fe56 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -607,12 +607,14 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord << ", total fragment num on current host: " << params.fragment_num_on_host << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type; + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << params.current_connect_fe; // This may be a first fragment request of the query. // Create the query fragments context. - query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, - params.coord, pipeline, params.is_nereids); + query_ctx = + QueryContext::create_shared(query_id, _exec_env, params.query_options, params.coord, + pipeline, params.is_nereids, params.current_connect_fe); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2dafb8dd3ec..429c4f80563 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -57,7 +57,7 @@ public: QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, - bool is_pipeline, bool is_nereids) + bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe) : _timeout_second(-1), _query_id(query_id), _exec_env(exec_env), @@ -81,10 +81,13 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, DCHECK_EQ(is_query_type_valid, true); this->coord_addr = coord_addr; - // external query has no coord_addr + // current_connect_fe is used for report query statistics + this->current_connect_fe = current_connect_fe; + // external query has no current_connect_fe if (query_options.query_type != TQueryType::EXTERNAL) { - bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && this->coord_addr.port != 0; - DCHECK_EQ(is_coord_addr_valid, true); + bool is_report_fe_addr_valid = + !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0; + DCHECK_EQ(is_report_fe_addr_valid, true); } register_memory_statistics(); @@ -284,7 +287,7 @@ void QueryContext::set_pipeline_context( void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), qs, coord_addr, _query_options.query_type); + print_id(_query_id), qs, current_connect_fe, _query_options.query_type); } std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() { @@ -298,7 +301,7 @@ void QueryContext::register_memory_statistics() { std::string query_id = print_id(_query_id); if (qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - query_id, qs, coord_addr, _query_options.query_type); + query_id, qs, current_connect_fe, _query_options.query_type); } else { LOG(INFO) << " query " << query_id << " get memory query statistics failed "; } @@ -309,7 +312,8 @@ void QueryContext::register_cpu_statistics() { if (!_cpu_statistics) { _cpu_statistics = std::make_shared<QueryStatistics>(); _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), _cpu_statistics, coord_addr, _query_options.query_type); + print_id(_query_id), _cpu_statistics, current_connect_fe, + _query_options.query_type); } } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 4c1ee2cf574..3df3677e502 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -73,7 +73,8 @@ class QueryContext { public: QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, - TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids); + TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids, + TNetworkAddress current_connect_fe); ~QueryContext(); @@ -246,6 +247,7 @@ public: std::string user; std::string group; TNetworkAddress coord_addr; + TNetworkAddress current_connect_fe; TQueryGlobals query_globals; ObjectPool obj_pool; diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index dda41936284..44dff0f54ac 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -382,8 +382,8 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { std::string add_str = PrintThriftNetworkAddress(addr); if (!coord_status.ok()) { std::stringstream ss; - LOG(WARNING) << "could not get client " << add_str - << " when report workload runtime stats, reason is " + LOG(WARNING) << "[report_query_statistics]could not get client " << add_str + << " when report workload runtime stats, reason:" << coord_status.to_string(); continue; } @@ -402,26 +402,38 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "fe " << add_str - << " throw exception when report statistics, reason=" << e.what() + LOG(WARNING) << "[report_query_statistics]fe " << add_str + << " throw exception when report statistics, reason:" << e.what() << " , you can see fe log for details."; } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "report workload runtime statistics to " << add_str - << " failed, err: " << e.what(); + LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to " + << add_str << " failed, reason: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { - LOG(WARNING) - << "reopen thrift client failed when report workload runtime statistics to" - << add_str; + LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report " + "workload runtime statistics to" + << add_str; } else { try { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::transport::TTransportException& e2) { - LOG(WARNING) << "retry report workload runtime stats to " << add_str - << " failed, err: " << e2.what(); + LOG(WARNING) + << "[report_query_statistics]retry report workload runtime stats to " + << add_str << " failed, reason: " << e2.what(); + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknow exception when report workload " + "runtime statistics to {}, " + "reason:{}. ", + add_str, e.what()); } } + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknown exception when report workload runtime " + "statistics to {}, reason:{}. ", + add_str, e.what()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 4b66f2f0b21..973134563b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -294,6 +294,7 @@ public class StreamLoadPlanner { params.setDescTbl(analyzer.getDescTbl().toThrift()); params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); TPipelineInstanceParams execParams = new TPipelineInstanceParams(); // user load id (streamLoadTask.id) as query id diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index b02d895ff2d..df1a4ac2408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -238,6 +238,10 @@ public class ConnectContext { private Exec exec; private boolean runProcedure = false; + // isProxy used for forward request from other FE and used in one thread + // it's default thread-safe + private boolean isProxy = false; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -360,6 +364,7 @@ public class ConnectContext { mysqlChannel = new MysqlChannel(connection, this); } else if (isProxy) { mysqlChannel = new ProxyMysqlChannel(); + this.isProxy = isProxy; } else { mysqlChannel = new DummyMysqlChannel(); } @@ -1368,4 +1373,8 @@ public class ConnectContext { public void setUserVars(Map<String, LiteralExpr> userVars) { this.userVars = userVars; } + + public boolean isProxy() { + return isProxy; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c6755746106..734907ac0c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -130,6 +130,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; @@ -196,6 +197,10 @@ public class Coordinator implements CoordInterface { private final TQueryGlobals queryGlobals = new TQueryGlobals(); private TQueryOptions queryOptions; private TNetworkAddress coordAddress; + // fe audit log in connected FE,if a query is forward + // we should send the connected FE to be, + // then be report query statistics to the connected FE + private TNetworkAddress currentConnectFE; // protects all fields below private final Lock lock = new ReentrantLock(); @@ -525,6 +530,13 @@ public class Coordinator implements CoordInterface { } coordAddress = new TNetworkAddress(localIP, Config.rpc_port); + if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty( + ConnectContext.get().getCurrentConnectedFEIp())) { + currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(), + Config.rpc_port); + } else { + currentConnectFE = coordAddress; + } this.idToBackend = Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster(); @@ -3019,6 +3031,7 @@ public class Coordinator implements CoordInterface { params.params.setSenderId(i); params.params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setBackendNum(backendNum++); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); @@ -3115,6 +3128,7 @@ public class Coordinator implements CoordInterface { params.setDestinations(destinations); params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); params.query_options.setMemLimit(memLimit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index f1b87196516..07955ede778 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; @@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be +// this may cause in some corner case missing statistics update,for example: +// time1: clear logic judge query 1 is timeout +// time2: query 1 is update by report +// time3: clear logic remove query 1 +// in this case, lost query stats is allowed. because query report time out is 60s by default, +// when this case happens, we should find why be not report for so long first. public class WorkloadRuntimeStatusMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); - private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap(); - private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap(); - private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap(); + private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap(); private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); private List<AuditEvent> queryAuditEventList = Lists.newLinkedList(); + private class BeReportInfo { + volatile long beLastReportTime; + + BeReportInfo(long beLastReportTime) { + this.beLastReportTime = beLastReportTime; + } + + Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap(); + } + public WorkloadRuntimeStatusMgr() { super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms); } @@ -118,45 +133,65 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { return; } long beId = params.backend_id; - Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId); - beLastReportTime.put(beId, System.currentTimeMillis()); - if (queryIdMap == null) { - queryIdMap = Maps.newConcurrentMap(); - queryIdMap.putAll(params.query_statistics_map); - beToQueryStatsMap.put(beId, queryIdMap); + // NOTE(wb) one be sends update request one by one, + // so there is no need a global lock for beToQueryStatsMap here, + // just keep one be's put/remove/get is atomic operation is enough + long currentTime = System.currentTimeMillis(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (beReportInfo == null) { + beReportInfo = new BeReportInfo(currentTime); + beToQueryStatsMap.put(beId, beReportInfo); } else { - long currentTime = System.currentTimeMillis(); - for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { - queryIdMap.put(entry.getKey(), entry.getValue()); - queryLastReportTime.put(entry.getKey(), currentTime); + beReportInfo.beLastReportTime = currentTime; + } + for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { + beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue())); + } + } + + void clearReportTimeoutBeStatistics() { + // 1 clear report timeout be + Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); + Long currentTime = System.currentTimeMillis(); + for (Long beId : currentBeIdSet) { + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beToQueryStatsMap.remove(beId); + continue; + } + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); + for (String queryId : queryIdSet) { + Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId); + long queryLastReportTime = pair.first; + if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beReportInfo.queryStatsMap.remove(queryId); + } } } } + // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap + // so there is no need lock or null check when visit beToQueryStatsMap public Map<String, TQueryStatistics> getQueryStatisticsMap() { // 1 merge query stats in all be Set<Long> beIdSet = beToQueryStatsMap.keySet(); - Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap(); + Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap(); for (Long beId : beIdSet) { - Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId); - Set<String> queryIdSet = currentQueryMap.keySet(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); for (String queryId : queryIdSet) { - TQueryStatistics retQuery = retQueryMap.get(queryId); + TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second; + + TQueryStatistics retQuery = resultQueryMap.get(queryId); if (retQuery == null) { retQuery = new TQueryStatistics(); - retQueryMap.put(queryId, retQuery); + resultQueryMap.put(queryId, retQuery); } - - TQueryStatistics curQueryStats = currentQueryMap.get(queryId); mergeQueryStatistics(retQuery, curQueryStats); } } - return retQueryMap; - } - - public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() { - return beToQueryStatsMap; + return resultQueryMap; } private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { @@ -172,44 +207,6 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { } } - void clearReportTimeoutBeStatistics() { - // 1 clear report timeout be - Set<Long> beNeedToRemove = new HashSet<>(); - Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); - Long currentTime = System.currentTimeMillis(); - for (Long beId : currentBeIdSet) { - Long lastReportTime = beLastReportTime.get(beId); - if (lastReportTime != null - && currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - beNeedToRemove.add(beId); - } - } - for (Long beId : beNeedToRemove) { - beToQueryStatsMap.remove(beId); - beLastReportTime.remove(beId); - } - - // 2 clear report timeout query - Set<String> queryNeedToClear = new HashSet<>(); - Long newCurrentTime = System.currentTimeMillis(); - Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet(); - for (String queryId : queryLastReportTimeKeySet) { - Long lastReportTime = queryLastReportTime.get(queryId); - if (lastReportTime != null - && newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - queryNeedToClear.add(queryId); - } - } - - Set<Long> beIdSet = beToQueryStatsMap.keySet(); - for (String queryId : queryNeedToClear) { - for (Long beId : beIdSet) { - beToQueryStatsMap.get(beId).remove(queryId); - } - queryLastReportTime.remove(queryId); - } - } - private void queryAuditEventLogWriteLock() { queryAuditEventLock.writeLock().lock(); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e9feb48f763..9df69dd995f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -533,6 +533,8 @@ struct TExecPlanFragmentParams { 31: optional bool is_nereids = true; + 32: optional Types.TNetworkAddress current_connect_fe + // For cloud 1000: optional bool is_mow_table; } @@ -770,6 +772,7 @@ struct TPipelineFragmentParams { 40: optional bool is_nereids = true; 41: optional i64 wal_id 42: optional i64 content_length + 43: optional Types.TNetworkAddress current_connect_fe // For cloud 1000: optional bool is_mow_table; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org