This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 88e02c836d1 [Fix]Fix insert select missing audit log when connect follower FE (#36481) 88e02c836d1 is described below commit 88e02c836d1c7b96c63524776da57f8aacbaa600 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jun 20 15:16:16 2024 +0800 [Fix]Fix insert select missing audit log when connect follower FE (#36481) ## Proposed changes pick #36472 --- be/src/runtime/fragment_mgr.cpp | 5 +- be/src/runtime/query_context.cpp | 18 +-- be/src/runtime/query_context.h | 3 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 34 ++++-- .../apache/doris/planner/StreamLoadPlanner.java | 2 + .../java/org/apache/doris/qe/ConnectContext.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++ .../WorkloadRuntimeStatusMgr.java | 125 ++++++++++----------- gensrc/thrift/PaloInternalService.thrift | 3 + 9 files changed, 128 insertions(+), 85 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 63079933ca1..a7808cb6d56 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -643,13 +643,14 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord << ", total fragment num on current host: " << params.fragment_num_on_host << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type; + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << params.current_connect_fe; // This may be a first fragment request of the query. // Create the query fragments context. query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env, params.query_options, params.coord, pipeline, - params.is_nereids); + params.is_nereids, params.current_connect_fe); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index f9cc9757fe3..bbcdc3b4771 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -43,7 +43,7 @@ public: QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, - bool is_pipeline, bool is_nereids) + bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe) : fragment_num(total_fragment_num), timeout_second(-1), _query_id(query_id), @@ -70,10 +70,13 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* DCHECK_EQ(is_query_type_valid, true); this->coord_addr = coord_addr; - // external query has no coord_addr + // current_connect_fe is used for report query statistics + this->current_connect_fe = current_connect_fe; + // external query has no current_connect_fe if (query_options.query_type != TQueryType::EXTERNAL) { - bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && this->coord_addr.port != 0; - DCHECK_EQ(is_coord_addr_valid, true); + bool is_report_fe_addr_valid = + !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0; + DCHECK_EQ(is_report_fe_addr_valid, true); } register_memory_statistics(); @@ -265,7 +268,7 @@ void QueryContext::set_pipeline_context( void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), qs, coord_addr, _query_options.query_type); + print_id(_query_id), qs, current_connect_fe, _query_options.query_type); } std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() { @@ -279,7 +282,7 @@ void QueryContext::register_memory_statistics() { std::string query_id = print_id(_query_id); if (qs) { _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - query_id, qs, coord_addr, _query_options.query_type); + query_id, qs, current_connect_fe, _query_options.query_type); } else { LOG(INFO) << " query " << query_id << " get memory query statistics failed "; } @@ -290,7 +293,8 @@ void QueryContext::register_cpu_statistics() { if (!_cpu_statistics) { _cpu_statistics = std::make_shared<QueryStatistics>(); _exec_env->runtime_query_statistics_mgr()->register_query_statistics( - print_id(_query_id), _cpu_statistics, coord_addr, _query_options.query_type); + print_id(_query_id), _cpu_statistics, current_connect_fe, + _query_options.query_type); } } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index c78886997d0..2653eeddc8b 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -71,7 +71,7 @@ class QueryContext { public: QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_pipeline, - bool is_nereids); + bool is_nereids, TNetworkAddress current_connect_fe); ~QueryContext(); @@ -275,6 +275,7 @@ public: std::string user; std::string group; TNetworkAddress coord_addr; + TNetworkAddress current_connect_fe; TQueryGlobals query_globals; /// In the current implementation, for multiple fragments executed by a query on the same BE node, diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 5257f53bb00..32ca00d2c65 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -89,8 +89,8 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { std::string add_str = PrintThriftNetworkAddress(addr); if (!coord_status.ok()) { std::stringstream ss; - LOG(WARNING) << "could not get client " << add_str - << " when report workload runtime stats, reason is " + LOG(WARNING) << "[report_query_statistics]could not get client " << add_str + << " when report workload runtime stats, reason:" << coord_status.to_string(); continue; } @@ -109,26 +109,38 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "fe " << add_str - << " throw exception when report statistics, reason=" << e.what() + LOG(WARNING) << "[report_query_statistics]fe " << add_str + << " throw exception when report statistics, reason:" << e.what() << " , you can see fe log for details."; } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "report workload runtime statistics to " << add_str - << " failed, err: " << e.what(); + LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to " + << add_str << " failed, reason: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { - LOG(WARNING) - << "reopen thrift client failed when report workload runtime statistics to" - << add_str; + LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report " + "workload runtime statistics to" + << add_str; } else { try { coord->reportExecStatus(res, params); rpc_result[addr] = true; } catch (apache::thrift::transport::TTransportException& e2) { - LOG(WARNING) << "retry report workload runtime stats to " << add_str - << " failed, err: " << e2.what(); + LOG(WARNING) + << "[report_query_statistics]retry report workload runtime stats to " + << add_str << " failed, reason: " << e2.what(); + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknow exception when report workload " + "runtime statistics to {}, " + "reason:{}. ", + add_str, e.what()); } } + } catch (std::exception& e) { + LOG_WARNING( + "[report_query_statistics]unknown exception when report workload runtime " + "statistics to {}, reason:{}. ", + add_str, e.what()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 4d63a92c750..33979e4c354 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -289,6 +289,7 @@ public class StreamLoadPlanner { params.setDescTbl(analyzer.getDescTbl().toThrift()); params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); TPlanFragmentExecParams execParams = new TPlanFragmentExecParams(); // user load id (streamLoadTask.id) as query id @@ -521,6 +522,7 @@ public class StreamLoadPlanner { pipParams.setDescTbl(analyzer.getDescTbl().toThrift()); pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + pipParams.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); pipParams.setQueryId(loadId); pipParams.per_exch_num_senders = Maps.newHashMap(); pipParams.destinations = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 48a79bd9b07..16b1b3c2c83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -233,6 +233,10 @@ public class ConnectContext { private Exec exec; private boolean runProcedure = false; + // isProxy used for forward request from other FE and used in one thread + // it's default thread-safe + private boolean isProxy = false; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -355,6 +359,7 @@ public class ConnectContext { mysqlChannel = new MysqlChannel(connection, this); } else if (isProxy) { mysqlChannel = new ProxyMysqlChannel(); + this.isProxy = isProxy; } else { mysqlChannel = new DummyMysqlChannel(); } @@ -1146,4 +1151,8 @@ public class ConnectContext { public void setUserVars(Map<String, LiteralExpr> userVars) { this.userVars = userVars; } + + public boolean isProxy() { + return isProxy; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 901effde1fd..1a4691e466a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -131,6 +131,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; @@ -197,6 +198,10 @@ public class Coordinator implements CoordInterface { private final TQueryGlobals queryGlobals = new TQueryGlobals(); private TQueryOptions queryOptions; private TNetworkAddress coordAddress; + // fe audit log in connected FE,if a query is forward + // we should send the connected FE to be, + // then be report query statistics to the connected FE + private TNetworkAddress currentConnectFE; // protects all fields below private final Lock lock = new ReentrantLock(); @@ -551,6 +556,13 @@ public class Coordinator implements CoordInterface { } coordAddress = new TNetworkAddress(localIP, Config.rpc_port); + if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty( + ConnectContext.get().getCurrentConnectedFEIp())) { + currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(), + Config.rpc_port); + } else { + currentConnectFE = coordAddress; + } this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { @@ -3803,6 +3815,7 @@ public class Coordinator implements CoordInterface { params.params.setSenderId(i); params.params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setBackendNum(backendNum++); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); @@ -3908,6 +3921,7 @@ public class Coordinator implements CoordInterface { params.setDestinations(destinations); params.setNumSenders(instanceExecParams.size()); params.setCoord(coordAddress); + params.setCurrentConnectFe(currentConnectFE); params.setQueryGlobals(queryGlobals); params.setQueryOptions(queryOptions); params.query_options.setEnablePipelineEngine(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index de4810b65ac..cb4b8a5f2e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; @@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be +// this may cause in some corner case missing statistics update,for example: +// time1: clear logic judge query 1 is timeout +// time2: query 1 is update by report +// time3: clear logic remove query 1 +// in this case, lost query stats is allowed. because query report time out is 60s by default, +// when this case happens, we should find why be not report for so long first. public class WorkloadRuntimeStatusMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); - private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap(); - private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap(); - private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap(); + private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap(); private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); private List<AuditEvent> queryAuditEventList = Lists.newLinkedList(); + private class BeReportInfo { + volatile long beLastReportTime; + + BeReportInfo(long beLastReportTime) { + this.beLastReportTime = beLastReportTime; + } + + Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap(); + } + public WorkloadRuntimeStatusMgr() { super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms); } @@ -116,45 +131,65 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { return; } long beId = params.backend_id; - Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId); - beLastReportTime.put(beId, System.currentTimeMillis()); - if (queryIdMap == null) { - queryIdMap = Maps.newConcurrentMap(); - queryIdMap.putAll(params.query_statistics_map); - beToQueryStatsMap.put(beId, queryIdMap); + // NOTE(wb) one be sends update request one by one, + // so there is no need a global lock for beToQueryStatsMap here, + // just keep one be's put/remove/get is atomic operation is enough + long currentTime = System.currentTimeMillis(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (beReportInfo == null) { + beReportInfo = new BeReportInfo(currentTime); + beToQueryStatsMap.put(beId, beReportInfo); } else { - long currentTime = System.currentTimeMillis(); - for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { - queryIdMap.put(entry.getKey(), entry.getValue()); - queryLastReportTime.put(entry.getKey(), currentTime); + beReportInfo.beLastReportTime = currentTime; + } + for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) { + beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue())); + } + } + + void clearReportTimeoutBeStatistics() { + // 1 clear report timeout be + Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); + Long currentTime = System.currentTimeMillis(); + for (Long beId : currentBeIdSet) { + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beToQueryStatsMap.remove(beId); + continue; + } + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); + for (String queryId : queryIdSet) { + Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId); + long queryLastReportTime = pair.first; + if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + beReportInfo.queryStatsMap.remove(queryId); + } } } } + // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap + // so there is no need lock or null check when visit beToQueryStatsMap public Map<String, TQueryStatistics> getQueryStatisticsMap() { // 1 merge query stats in all be Set<Long> beIdSet = beToQueryStatsMap.keySet(); - Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap(); + Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap(); for (Long beId : beIdSet) { - Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId); - Set<String> queryIdSet = currentQueryMap.keySet(); + BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet(); for (String queryId : queryIdSet) { - TQueryStatistics retQuery = retQueryMap.get(queryId); + TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second; + + TQueryStatistics retQuery = resultQueryMap.get(queryId); if (retQuery == null) { retQuery = new TQueryStatistics(); - retQueryMap.put(queryId, retQuery); + resultQueryMap.put(queryId, retQuery); } - - TQueryStatistics curQueryStats = currentQueryMap.get(queryId); mergeQueryStatistics(retQuery, curQueryStats); } } - return retQueryMap; - } - - public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() { - return beToQueryStatsMap; + return resultQueryMap; } private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { @@ -168,44 +203,6 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { } } - void clearReportTimeoutBeStatistics() { - // 1 clear report timeout be - Set<Long> beNeedToRemove = new HashSet<>(); - Set<Long> currentBeIdSet = beToQueryStatsMap.keySet(); - Long currentTime = System.currentTimeMillis(); - for (Long beId : currentBeIdSet) { - Long lastReportTime = beLastReportTime.get(beId); - if (lastReportTime != null - && currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - beNeedToRemove.add(beId); - } - } - for (Long beId : beNeedToRemove) { - beToQueryStatsMap.remove(beId); - beLastReportTime.remove(beId); - } - - // 2 clear report timeout query - Set<String> queryNeedToClear = new HashSet<>(); - Long newCurrentTime = System.currentTimeMillis(); - Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet(); - for (String queryId : queryLastReportTimeKeySet) { - Long lastReportTime = queryLastReportTime.get(queryId); - if (lastReportTime != null - && newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { - queryNeedToClear.add(queryId); - } - } - - Set<Long> beIdSet = beToQueryStatsMap.keySet(); - for (String queryId : queryNeedToClear) { - for (Long beId : beIdSet) { - beToQueryStatsMap.get(beId).remove(queryId); - } - queryLastReportTime.remove(queryId); - } - } - private void queryAuditEventLogWriteLock() { queryAuditEventLock.writeLock().lock(); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index ad8836fefbe..35331f13fc7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -522,6 +522,8 @@ struct TExecPlanFragmentParams { 31: optional bool is_nereids = true; + 32: optional Types.TNetworkAddress current_connect_fe + // For cloud 1000: optional bool is_mow_table; } @@ -745,6 +747,7 @@ struct TPipelineFragmentParams { 39: optional map<i32, i32> shuffle_idx_to_instance_idx 40: optional bool is_nereids = true; 41: optional i64 wal_id + 43: optional Types.TNetworkAddress current_connect_fe // For cloud 1000: optional bool is_mow_table; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org