This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 88e02c836d1 [Fix]Fix insert select missing audit log when connect 
follower FE (#36481)
88e02c836d1 is described below

commit 88e02c836d1c7b96c63524776da57f8aacbaa600
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jun 20 15:16:16 2024 +0800

    [Fix]Fix insert select missing audit log when connect follower FE (#36481)
    
    ## Proposed changes
    
    pick #36472
---
 be/src/runtime/fragment_mgr.cpp                    |   5 +-
 be/src/runtime/query_context.cpp                   |  18 +--
 be/src/runtime/query_context.h                     |   3 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  34 ++++--
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +
 .../java/org/apache/doris/qe/ConnectContext.java   |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  14 +++
 .../WorkloadRuntimeStatusMgr.java                  | 125 ++++++++++-----------
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 9 files changed, 128 insertions(+), 85 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 63079933ca1..a7808cb6d56 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -643,13 +643,14 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << 
params.coord
                   << ", total fragment num on current host: " << 
params.fragment_num_on_host
                   << ", fe process uuid: " << 
params.query_options.fe_process_uuid
-                  << ", query type: " << params.query_options.query_type;
+                  << ", query type: " << params.query_options.query_type
+                  << ", report audit fe:" << params.current_connect_fe;
 
         // This may be a first fragment request of the query.
         // Create the query fragments context.
         query_ctx = QueryContext::create_shared(query_id, 
params.fragment_num_on_host, _exec_env,
                                                 params.query_options, 
params.coord, pipeline,
-                                                params.is_nereids);
+                                                params.is_nereids, 
params.current_connect_fe);
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index f9cc9757fe3..bbcdc3b4771 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -43,7 +43,7 @@ public:
 
 QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, 
ExecEnv* exec_env,
                            const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
-                           bool is_pipeline, bool is_nereids)
+                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe)
         : fragment_num(total_fragment_num),
           timeout_second(-1),
           _query_id(query_id),
@@ -70,10 +70,13 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
     DCHECK_EQ(is_query_type_valid, true);
 
     this->coord_addr = coord_addr;
-    // external query has no coord_addr
+    // current_connect_fe is used for report query statistics
+    this->current_connect_fe = current_connect_fe;
+    // external query has no current_connect_fe
     if (query_options.query_type != TQueryType::EXTERNAL) {
-        bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && 
this->coord_addr.port != 0;
-        DCHECK_EQ(is_coord_addr_valid, true);
+        bool is_report_fe_addr_valid =
+                !this->current_connect_fe.hostname.empty() && 
this->current_connect_fe.port != 0;
+        DCHECK_EQ(is_report_fe_addr_valid, true);
     }
 
     register_memory_statistics();
@@ -265,7 +268,7 @@ void QueryContext::set_pipeline_context(
 
 void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> 
qs) {
     _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-            print_id(_query_id), qs, coord_addr, _query_options.query_type);
+            print_id(_query_id), qs, current_connect_fe, 
_query_options.query_type);
 }
 
 std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
@@ -279,7 +282,7 @@ void QueryContext::register_memory_statistics() {
         std::string query_id = print_id(_query_id);
         if (qs) {
             
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                    query_id, qs, coord_addr, _query_options.query_type);
+                    query_id, qs, current_connect_fe, 
_query_options.query_type);
         } else {
             LOG(INFO) << " query " << query_id << " get memory query 
statistics failed ";
         }
@@ -290,7 +293,8 @@ void QueryContext::register_cpu_statistics() {
     if (!_cpu_statistics) {
         _cpu_statistics = std::make_shared<QueryStatistics>();
         _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                print_id(_query_id), _cpu_statistics, coord_addr, 
_query_options.query_type);
+                print_id(_query_id), _cpu_statistics, current_connect_fe,
+                _query_options.query_type);
     }
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c78886997d0..2653eeddc8b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -71,7 +71,7 @@ class QueryContext {
 public:
     QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
                  const TQueryOptions& query_options, TNetworkAddress 
coord_addr, bool is_pipeline,
-                 bool is_nereids);
+                 bool is_nereids, TNetworkAddress current_connect_fe);
 
     ~QueryContext();
 
@@ -275,6 +275,7 @@ public:
     std::string user;
     std::string group;
     TNetworkAddress coord_addr;
+    TNetworkAddress current_connect_fe;
     TQueryGlobals query_globals;
 
     /// In the current implementation, for multiple fragments executed by a 
query on the same BE node,
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 5257f53bb00..32ca00d2c65 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -89,8 +89,8 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
         std::string add_str = PrintThriftNetworkAddress(addr);
         if (!coord_status.ok()) {
             std::stringstream ss;
-            LOG(WARNING) << "could not get client " << add_str
-                         << " when report workload runtime stats, reason is "
+            LOG(WARNING) << "[report_query_statistics]could not get client " 
<< add_str
+                         << " when report workload runtime stats, reason:"
                          << coord_status.to_string();
             continue;
         }
@@ -109,26 +109,38 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
             coord->reportExecStatus(res, params);
             rpc_result[addr] = true;
         } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "fe " << add_str
-                         << " throw exception when report statistics, reason=" 
<< e.what()
+            LOG(WARNING) << "[report_query_statistics]fe " << add_str
+                         << " throw exception when report statistics, reason:" 
<< e.what()
                          << " , you can see fe log for details.";
         } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "report workload runtime statistics to " << add_str
-                         << " failed,  err: " << e.what();
+            LOG(WARNING) << "[report_query_statistics]report workload runtime 
statistics to "
+                         << add_str << " failed,  reason: " << e.what();
             rpc_status = coord.reopen();
             if (!rpc_status.ok()) {
-                LOG(WARNING)
-                        << "reopen thrift client failed when report workload 
runtime statistics to"
-                        << add_str;
+                LOG(WARNING) << "[report_query_statistics]reopen thrift client 
failed when report "
+                                "workload runtime statistics to"
+                             << add_str;
             } else {
                 try {
                     coord->reportExecStatus(res, params);
                     rpc_result[addr] = true;
                 } catch (apache::thrift::transport::TTransportException& e2) {
-                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
-                                 << " failed,  err: " << e2.what();
+                    LOG(WARNING)
+                            << "[report_query_statistics]retry report workload 
runtime stats to "
+                            << add_str << " failed,  reason: " << e2.what();
+                } catch (std::exception& e) {
+                    LOG_WARNING(
+                            "[report_query_statistics]unknow exception when 
report workload "
+                            "runtime statistics to {}, "
+                            "reason:{}. ",
+                            add_str, e.what());
                 }
             }
+        } catch (std::exception& e) {
+            LOG_WARNING(
+                    "[report_query_statistics]unknown exception when report 
workload runtime "
+                    "statistics to {}, reason:{}. ",
+                    add_str, e.what());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 4d63a92c750..33979e4c354 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -289,6 +289,7 @@ public class StreamLoadPlanner {
 
         params.setDescTbl(analyzer.getDescTbl().toThrift());
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
         TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
         // user load id (streamLoadTask.id) as query id
@@ -521,6 +522,7 @@ public class StreamLoadPlanner {
 
         pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
         pipParams.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        pipParams.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
         pipParams.setQueryId(loadId);
         pipParams.per_exch_num_senders = Maps.newHashMap();
         pipParams.destinations = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 48a79bd9b07..16b1b3c2c83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -233,6 +233,10 @@ public class ConnectContext {
     private Exec exec;
     private boolean runProcedure = false;
 
+    // isProxy used for forward request from other FE and used in one thread
+    // it's default thread-safe
+    private boolean isProxy = false;
+
     public void setUserQueryTimeout(int queryTimeout) {
         if (queryTimeout > 0) {
             sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -355,6 +359,7 @@ public class ConnectContext {
             mysqlChannel = new MysqlChannel(connection, this);
         } else if (isProxy) {
             mysqlChannel = new ProxyMysqlChannel();
+            this.isProxy = isProxy;
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
@@ -1146,4 +1151,8 @@ public class ConnectContext {
     public void setUserVars(Map<String, LiteralExpr> userVars) {
         this.userVars = userVars;
     }
+
+    public boolean isProxy() {
+        return isProxy;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 901effde1fd..1a4691e466a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -131,6 +131,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
@@ -197,6 +198,10 @@ public class Coordinator implements CoordInterface {
     private final TQueryGlobals queryGlobals = new TQueryGlobals();
     private TQueryOptions queryOptions;
     private TNetworkAddress coordAddress;
+    // fe audit log in connected FE,if a query is forward
+    // we should send the connected FE to be,
+    // then be report query statistics to the connected FE
+    private TNetworkAddress currentConnectFE;
 
     // protects all fields below
     private final Lock lock = new ReentrantLock();
@@ -551,6 +556,13 @@ public class Coordinator implements CoordInterface {
         }
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
+        if (ConnectContext.get() != null && ConnectContext.get().isProxy() && 
!StringUtils.isEmpty(
+                ConnectContext.get().getCurrentConnectedFEIp())) {
+            currentConnectFE = new 
TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
+                    Config.rpc_port);
+        } else {
+            currentConnectFE = coordAddress;
+        }
 
         this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
         if (LOG.isDebugEnabled()) {
@@ -3803,6 +3815,7 @@ public class Coordinator implements CoordInterface {
                 params.params.setSenderId(i);
                 params.params.setNumSenders(instanceExecParams.size());
                 params.setCoord(coordAddress);
+                params.setCurrentConnectFe(currentConnectFE);
                 params.setBackendNum(backendNum++);
                 params.setQueryGlobals(queryGlobals);
                 params.setQueryOptions(queryOptions);
@@ -3908,6 +3921,7 @@ public class Coordinator implements CoordInterface {
                     params.setDestinations(destinations);
                     params.setNumSenders(instanceExecParams.size());
                     params.setCoord(coordAddress);
+                    params.setCurrentConnectFe(currentConnectFE);
                     params.setQueryGlobals(queryGlobals);
                     params.setQueryOptions(queryOptions);
                     params.query_options.setEnablePipelineEngine(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index de4810b65ac..cb4b8a5f2e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.plugin.audit.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
@@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+// NOTE: not using a lock for beToQueryStatsMap's update because it should 
void global lock for all be
+// this may cause in some corner case missing statistics update,for example:
+// time1: clear logic judge query 1 is timeout
+// time2: query 1 is update by report
+// time3: clear logic remove query 1
+// in this case, lost query stats is allowed. because query report time out is 
60s by default,
+// when this case happens, we should find why be not report for so long first.
 public class WorkloadRuntimeStatusMgr extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
-    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
-    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
-    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private Map<Long, BeReportInfo> beToQueryStatsMap = 
Maps.newConcurrentMap();
     private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
 
+    private class BeReportInfo {
+        volatile long beLastReportTime;
+
+        BeReportInfo(long beLastReportTime) {
+            this.beLastReportTime = beLastReportTime;
+        }
+
+        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = 
Maps.newConcurrentMap();
+    }
+
     public WorkloadRuntimeStatusMgr() {
         super("workload-runtime-stats-thread", 
Config.workload_runtime_status_thread_interval_ms);
     }
@@ -116,45 +131,65 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
             return;
         }
         long beId = params.backend_id;
-        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
-        beLastReportTime.put(beId, System.currentTimeMillis());
-        if (queryIdMap == null) {
-            queryIdMap = Maps.newConcurrentMap();
-            queryIdMap.putAll(params.query_statistics_map);
-            beToQueryStatsMap.put(beId, queryIdMap);
+        // NOTE(wb) one be sends update request one by one,
+        // so there is no need a global lock for beToQueryStatsMap here,
+        // just keep one be's put/remove/get is atomic operation is enough
+        long currentTime = System.currentTimeMillis();
+        BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+        if (beReportInfo == null) {
+            beReportInfo = new BeReportInfo(currentTime);
+            beToQueryStatsMap.put(beId, beReportInfo);
         } else {
-            long currentTime = System.currentTimeMillis();
-            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
-                queryIdMap.put(entry.getKey(), entry.getValue());
-                queryLastReportTime.put(entry.getKey(), currentTime);
+            beReportInfo.beLastReportTime = currentTime;
+        }
+        for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            if (currentTime - beReportInfo.beLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beToQueryStatsMap.remove(beId);
+                continue;
+            }
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
+            for (String queryId : queryIdSet) {
+                Pair<Long, TQueryStatistics> pair = 
beReportInfo.queryStatsMap.get(queryId);
+                long queryLastReportTime = pair.first;
+                if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                    beReportInfo.queryStatsMap.remove(queryId);
+                }
             }
         }
     }
 
+    // NOTE: currently getQueryStatisticsMap must be called before clear 
beToQueryStatsMap
+    // so there is no need lock or null check when visit beToQueryStatsMap
     public Map<String, TQueryStatistics> getQueryStatisticsMap() {
         // 1 merge query stats in all be
         Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
         for (Long beId : beIdSet) {
-            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
-            Set<String> queryIdSet = currentQueryMap.keySet();
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                TQueryStatistics curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+
+                TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
                     retQuery = new TQueryStatistics();
-                    retQueryMap.put(queryId, retQuery);
+                    resultQueryMap.put(queryId, retQuery);
                 }
-
-                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
                 mergeQueryStatistics(retQuery, curQueryStats);
             }
         }
 
-        return retQueryMap;
-    }
-
-    public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() {
-        return beToQueryStatsMap;
+        return resultQueryMap;
     }
 
     private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
@@ -168,44 +203,6 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon 
{
         }
     }
 
-    void clearReportTimeoutBeStatistics() {
-        // 1 clear report timeout be
-        Set<Long> beNeedToRemove = new HashSet<>();
-        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
-        Long currentTime = System.currentTimeMillis();
-        for (Long beId : currentBeIdSet) {
-            Long lastReportTime = beLastReportTime.get(beId);
-            if (lastReportTime != null
-                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                beNeedToRemove.add(beId);
-            }
-        }
-        for (Long beId : beNeedToRemove) {
-            beToQueryStatsMap.remove(beId);
-            beLastReportTime.remove(beId);
-        }
-
-        // 2 clear report timeout query
-        Set<String> queryNeedToClear = new HashSet<>();
-        Long newCurrentTime = System.currentTimeMillis();
-        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
-        for (String queryId : queryLastReportTimeKeySet) {
-            Long lastReportTime = queryLastReportTime.get(queryId);
-            if (lastReportTime != null
-                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                queryNeedToClear.add(queryId);
-            }
-        }
-
-        Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        for (String queryId : queryNeedToClear) {
-            for (Long beId : beIdSet) {
-                beToQueryStatsMap.get(beId).remove(queryId);
-            }
-            queryLastReportTime.remove(queryId);
-        }
-    }
-
     private void queryAuditEventLogWriteLock() {
         queryAuditEventLock.writeLock().lock();
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index ad8836fefbe..35331f13fc7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -522,6 +522,8 @@ struct TExecPlanFragmentParams {
 
   31: optional bool is_nereids = true;
 
+  32: optional Types.TNetworkAddress current_connect_fe
+
   // For cloud
   1000: optional bool is_mow_table;
 }
@@ -745,6 +747,7 @@ struct TPipelineFragmentParams {
   39: optional map<i32, i32> shuffle_idx_to_instance_idx
   40: optional bool is_nereids = true;
   41: optional i64 wal_id
+  43: optional Types.TNetworkAddress current_connect_fe
 
   // For cloud
   1000: optional bool is_mow_table;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to