This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a2fb08c5944 [Fix]Fix insert select missing audit log when connect 
follower FE (#36472)
a2fb08c5944 is described below

commit a2fb08c59448866a6518196ea0ce4924948e91c1
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jun 20 15:16:10 2024 +0800

    [Fix]Fix insert select missing audit log when connect follower FE (#36472)
    
    ## Proposed changes
    pick #36454
    
    Fix when a ```insert select``` is executed in Follower, audit log could
    missing query statistics.
    This is because ```audit log``` is logged in the connect FE, but request
    is forward to master FE, then the coord FE is master FE, BE report query
    statistics to cood FE, finally the connected Follower could not get
    reported query statistics, audit log missing query statistics.
    We can add a new field to mark client connected FE, then be report query
    statistics to the connected FE.
    Besides, I do refactor for FE's WorkloadRuntimeStatusMgr to make logic
    more clear and add some log in be..
---
 be/src/runtime/fragment_mgr.cpp                    |   8 +-
 be/src/runtime/query_context.cpp                   |  18 +--
 be/src/runtime/query_context.h                     |   4 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  34 ++++--
 .../apache/doris/planner/StreamLoadPlanner.java    |   1 +
 .../java/org/apache/doris/qe/ConnectContext.java   |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  14 +++
 .../WorkloadRuntimeStatusMgr.java                  | 125 ++++++++++-----------
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 9 files changed, 130 insertions(+), 86 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1172b5b889b..9271f78fe56 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -607,12 +607,14 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << 
params.coord
                   << ", total fragment num on current host: " << 
params.fragment_num_on_host
                   << ", fe process uuid: " << 
params.query_options.fe_process_uuid
-                  << ", query type: " << params.query_options.query_type;
+                  << ", query type: " << params.query_options.query_type
+                  << ", report audit fe:" << params.current_connect_fe;
 
         // This may be a first fragment request of the query.
         // Create the query fragments context.
-        query_ctx = QueryContext::create_shared(query_id, _exec_env, 
params.query_options,
-                                                params.coord, pipeline, 
params.is_nereids);
+        query_ctx =
+                QueryContext::create_shared(query_id, _exec_env, 
params.query_options, params.coord,
+                                            pipeline, params.is_nereids, 
params.current_connect_fe);
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2dafb8dd3ec..429c4f80563 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -57,7 +57,7 @@ public:
 
 QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
                            const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
-                           bool is_pipeline, bool is_nereids)
+                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe)
         : _timeout_second(-1),
           _query_id(query_id),
           _exec_env(exec_env),
@@ -81,10 +81,13 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
     DCHECK_EQ(is_query_type_valid, true);
 
     this->coord_addr = coord_addr;
-    // external query has no coord_addr
+    // current_connect_fe is used for report query statistics
+    this->current_connect_fe = current_connect_fe;
+    // external query has no current_connect_fe
     if (query_options.query_type != TQueryType::EXTERNAL) {
-        bool is_coord_addr_valid = !this->coord_addr.hostname.empty() && 
this->coord_addr.port != 0;
-        DCHECK_EQ(is_coord_addr_valid, true);
+        bool is_report_fe_addr_valid =
+                !this->current_connect_fe.hostname.empty() && 
this->current_connect_fe.port != 0;
+        DCHECK_EQ(is_report_fe_addr_valid, true);
     }
 
     register_memory_statistics();
@@ -284,7 +287,7 @@ void QueryContext::set_pipeline_context(
 
 void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> 
qs) {
     _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-            print_id(_query_id), qs, coord_addr, _query_options.query_type);
+            print_id(_query_id), qs, current_connect_fe, 
_query_options.query_type);
 }
 
 std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
@@ -298,7 +301,7 @@ void QueryContext::register_memory_statistics() {
         std::string query_id = print_id(_query_id);
         if (qs) {
             
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                    query_id, qs, coord_addr, _query_options.query_type);
+                    query_id, qs, current_connect_fe, 
_query_options.query_type);
         } else {
             LOG(INFO) << " query " << query_id << " get memory query 
statistics failed ";
         }
@@ -309,7 +312,8 @@ void QueryContext::register_cpu_statistics() {
     if (!_cpu_statistics) {
         _cpu_statistics = std::make_shared<QueryStatistics>();
         _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
-                print_id(_query_id), _cpu_statistics, coord_addr, 
_query_options.query_type);
+                print_id(_query_id), _cpu_statistics, current_connect_fe,
+                _query_options.query_type);
     }
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 4c1ee2cf574..3df3677e502 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -73,7 +73,8 @@ class QueryContext {
 
 public:
     QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& 
query_options,
-                 TNetworkAddress coord_addr, bool is_pipeline, bool 
is_nereids);
+                 TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids,
+                 TNetworkAddress current_connect_fe);
 
     ~QueryContext();
 
@@ -246,6 +247,7 @@ public:
     std::string user;
     std::string group;
     TNetworkAddress coord_addr;
+    TNetworkAddress current_connect_fe;
     TQueryGlobals query_globals;
 
     ObjectPool obj_pool;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index dda41936284..44dff0f54ac 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -382,8 +382,8 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
         std::string add_str = PrintThriftNetworkAddress(addr);
         if (!coord_status.ok()) {
             std::stringstream ss;
-            LOG(WARNING) << "could not get client " << add_str
-                         << " when report workload runtime stats, reason is "
+            LOG(WARNING) << "[report_query_statistics]could not get client " 
<< add_str
+                         << " when report workload runtime stats, reason:"
                          << coord_status.to_string();
             continue;
         }
@@ -402,26 +402,38 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
             coord->reportExecStatus(res, params);
             rpc_result[addr] = true;
         } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "fe " << add_str
-                         << " throw exception when report statistics, reason=" 
<< e.what()
+            LOG(WARNING) << "[report_query_statistics]fe " << add_str
+                         << " throw exception when report statistics, reason:" 
<< e.what()
                          << " , you can see fe log for details.";
         } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "report workload runtime statistics to " << add_str
-                         << " failed,  err: " << e.what();
+            LOG(WARNING) << "[report_query_statistics]report workload runtime 
statistics to "
+                         << add_str << " failed,  reason: " << e.what();
             rpc_status = coord.reopen();
             if (!rpc_status.ok()) {
-                LOG(WARNING)
-                        << "reopen thrift client failed when report workload 
runtime statistics to"
-                        << add_str;
+                LOG(WARNING) << "[report_query_statistics]reopen thrift client 
failed when report "
+                                "workload runtime statistics to"
+                             << add_str;
             } else {
                 try {
                     coord->reportExecStatus(res, params);
                     rpc_result[addr] = true;
                 } catch (apache::thrift::transport::TTransportException& e2) {
-                    LOG(WARNING) << "retry report workload runtime stats to " 
<< add_str
-                                 << " failed,  err: " << e2.what();
+                    LOG(WARNING)
+                            << "[report_query_statistics]retry report workload 
runtime stats to "
+                            << add_str << " failed,  reason: " << e2.what();
+                } catch (std::exception& e) {
+                    LOG_WARNING(
+                            "[report_query_statistics]unknow exception when 
report workload "
+                            "runtime statistics to {}, "
+                            "reason:{}. ",
+                            add_str, e.what());
                 }
             }
+        } catch (std::exception& e) {
+            LOG_WARNING(
+                    "[report_query_statistics]unknown exception when report 
workload runtime "
+                    "statistics to {}, reason:{}. ",
+                    add_str, e.what());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 4b66f2f0b21..973134563b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -294,6 +294,7 @@ public class StreamLoadPlanner {
 
         params.setDescTbl(analyzer.getDescTbl().toThrift());
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
         TPipelineInstanceParams execParams = new TPipelineInstanceParams();
         // user load id (streamLoadTask.id) as query id
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index b02d895ff2d..df1a4ac2408 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -238,6 +238,10 @@ public class ConnectContext {
     private Exec exec;
     private boolean runProcedure = false;
 
+    // isProxy used for forward request from other FE and used in one thread
+    // it's default thread-safe
+    private boolean isProxy = false;
+
     public void setUserQueryTimeout(int queryTimeout) {
         if (queryTimeout > 0) {
             sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -360,6 +364,7 @@ public class ConnectContext {
             mysqlChannel = new MysqlChannel(connection, this);
         } else if (isProxy) {
             mysqlChannel = new ProxyMysqlChannel();
+            this.isProxy = isProxy;
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
@@ -1368,4 +1373,8 @@ public class ConnectContext {
     public void setUserVars(Map<String, LiteralExpr> userVars) {
         this.userVars = userVars;
     }
+
+    public boolean isProxy() {
+        return isProxy;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c6755746106..734907ac0c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -130,6 +130,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
@@ -196,6 +197,10 @@ public class Coordinator implements CoordInterface {
     private final TQueryGlobals queryGlobals = new TQueryGlobals();
     private TQueryOptions queryOptions;
     private TNetworkAddress coordAddress;
+    // fe audit log in connected FE,if a query is forward
+    // we should send the connected FE to be,
+    // then be report query statistics to the connected FE
+    private TNetworkAddress currentConnectFE;
 
     // protects all fields below
     private final Lock lock = new ReentrantLock();
@@ -525,6 +530,13 @@ public class Coordinator implements CoordInterface {
         }
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
+        if (ConnectContext.get() != null && ConnectContext.get().isProxy() && 
!StringUtils.isEmpty(
+                ConnectContext.get().getCurrentConnectedFEIp())) {
+            currentConnectFE = new 
TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
+                    Config.rpc_port);
+        } else {
+            currentConnectFE = coordAddress;
+        }
 
         this.idToBackend = 
Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster();
 
@@ -3019,6 +3031,7 @@ public class Coordinator implements CoordInterface {
                 params.params.setSenderId(i);
                 params.params.setNumSenders(instanceExecParams.size());
                 params.setCoord(coordAddress);
+                params.setCurrentConnectFe(currentConnectFE);
                 params.setBackendNum(backendNum++);
                 params.setQueryGlobals(queryGlobals);
                 params.setQueryOptions(queryOptions);
@@ -3115,6 +3128,7 @@ public class Coordinator implements CoordInterface {
                     params.setDestinations(destinations);
                     params.setNumSenders(instanceExecParams.size());
                     params.setCoord(coordAddress);
+                    params.setCurrentConnectFe(currentConnectFE);
                     params.setQueryGlobals(queryGlobals);
                     params.setQueryOptions(queryOptions);
                     params.query_options.setMemLimit(memLimit);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index f1b87196516..07955ede778 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.plugin.audit.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
@@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+// NOTE: not using a lock for beToQueryStatsMap's update because it should 
void global lock for all be
+// this may cause in some corner case missing statistics update,for example:
+// time1: clear logic judge query 1 is timeout
+// time2: query 1 is update by report
+// time3: clear logic remove query 1
+// in this case, lost query stats is allowed. because query report time out is 
60s by default,
+// when this case happens, we should find why be not report for so long first.
 public class WorkloadRuntimeStatusMgr extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
-    private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = 
Maps.newConcurrentMap();
-    private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
-    private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+    private Map<Long, BeReportInfo> beToQueryStatsMap = 
Maps.newConcurrentMap();
     private final ReentrantReadWriteLock queryAuditEventLock = new 
ReentrantReadWriteLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
 
+    private class BeReportInfo {
+        volatile long beLastReportTime;
+
+        BeReportInfo(long beLastReportTime) {
+            this.beLastReportTime = beLastReportTime;
+        }
+
+        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = 
Maps.newConcurrentMap();
+    }
+
     public WorkloadRuntimeStatusMgr() {
         super("workload-runtime-stats-thread", 
Config.workload_runtime_status_thread_interval_ms);
     }
@@ -118,45 +133,65 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
             return;
         }
         long beId = params.backend_id;
-        Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
-        beLastReportTime.put(beId, System.currentTimeMillis());
-        if (queryIdMap == null) {
-            queryIdMap = Maps.newConcurrentMap();
-            queryIdMap.putAll(params.query_statistics_map);
-            beToQueryStatsMap.put(beId, queryIdMap);
+        // NOTE(wb) one be sends update request one by one,
+        // so there is no need a global lock for beToQueryStatsMap here,
+        // just keep one be's put/remove/get is atomic operation is enough
+        long currentTime = System.currentTimeMillis();
+        BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+        if (beReportInfo == null) {
+            beReportInfo = new BeReportInfo(currentTime);
+            beToQueryStatsMap.put(beId, beReportInfo);
         } else {
-            long currentTime = System.currentTimeMillis();
-            for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
-                queryIdMap.put(entry.getKey(), entry.getValue());
-                queryLastReportTime.put(entry.getKey(), currentTime);
+            beReportInfo.beLastReportTime = currentTime;
+        }
+        for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
+            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+        }
+    }
+
+    void clearReportTimeoutBeStatistics() {
+        // 1 clear report timeout be
+        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+        Long currentTime = System.currentTimeMillis();
+        for (Long beId : currentBeIdSet) {
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            if (currentTime - beReportInfo.beLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                beToQueryStatsMap.remove(beId);
+                continue;
+            }
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
+            for (String queryId : queryIdSet) {
+                Pair<Long, TQueryStatistics> pair = 
beReportInfo.queryStatsMap.get(queryId);
+                long queryLastReportTime = pair.first;
+                if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                    beReportInfo.queryStatsMap.remove(queryId);
+                }
             }
         }
     }
 
+    // NOTE: currently getQueryStatisticsMap must be called before clear 
beToQueryStatsMap
+    // so there is no need lock or null check when visit beToQueryStatsMap
     public Map<String, TQueryStatistics> getQueryStatisticsMap() {
         // 1 merge query stats in all be
         Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+        Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
         for (Long beId : beIdSet) {
-            Map<String, TQueryStatistics> currentQueryMap = 
beToQueryStatsMap.get(beId);
-            Set<String> queryIdSet = currentQueryMap.keySet();
+            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatistics retQuery = retQueryMap.get(queryId);
+                TQueryStatistics curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+
+                TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
                     retQuery = new TQueryStatistics();
-                    retQueryMap.put(queryId, retQuery);
+                    resultQueryMap.put(queryId, retQuery);
                 }
-
-                TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
                 mergeQueryStatistics(retQuery, curQueryStats);
             }
         }
 
-        return retQueryMap;
-    }
-
-    public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() {
-        return beToQueryStatsMap;
+        return resultQueryMap;
     }
 
     private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
@@ -172,44 +207,6 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon 
{
         }
     }
 
-    void clearReportTimeoutBeStatistics() {
-        // 1 clear report timeout be
-        Set<Long> beNeedToRemove = new HashSet<>();
-        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
-        Long currentTime = System.currentTimeMillis();
-        for (Long beId : currentBeIdSet) {
-            Long lastReportTime = beLastReportTime.get(beId);
-            if (lastReportTime != null
-                    && currentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                beNeedToRemove.add(beId);
-            }
-        }
-        for (Long beId : beNeedToRemove) {
-            beToQueryStatsMap.remove(beId);
-            beLastReportTime.remove(beId);
-        }
-
-        // 2 clear report timeout query
-        Set<String> queryNeedToClear = new HashSet<>();
-        Long newCurrentTime = System.currentTimeMillis();
-        Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
-        for (String queryId : queryLastReportTimeKeySet) {
-            Long lastReportTime = queryLastReportTime.get(queryId);
-            if (lastReportTime != null
-                    && newCurrentTime - lastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
-                queryNeedToClear.add(queryId);
-            }
-        }
-
-        Set<Long> beIdSet = beToQueryStatsMap.keySet();
-        for (String queryId : queryNeedToClear) {
-            for (Long beId : beIdSet) {
-                beToQueryStatsMap.get(beId).remove(queryId);
-            }
-            queryLastReportTime.remove(queryId);
-        }
-    }
-
     private void queryAuditEventLogWriteLock() {
         queryAuditEventLock.writeLock().lock();
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e9feb48f763..9df69dd995f 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -533,6 +533,8 @@ struct TExecPlanFragmentParams {
 
   31: optional bool is_nereids = true;
 
+  32: optional Types.TNetworkAddress current_connect_fe
+
   // For cloud
   1000: optional bool is_mow_table;
 }
@@ -770,6 +772,7 @@ struct TPipelineFragmentParams {
   40: optional bool is_nereids = true;
   41: optional i64 wal_id
   42: optional i64 content_length
+  43: optional Types.TNetworkAddress current_connect_fe
 
   // For cloud
   1000: optional bool is_mow_table;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to