This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e4bec7fce [fix](arrow-flight-sql) Separate arrow-flight-sql 
connection and mysql connection (#50939)
34e4bec7fce is described below

commit 34e4bec7fcebcc7d8f9c847372825712e141a002
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Mon May 19 12:03:37 2025 +0800

    [fix](arrow-flight-sql) Separate arrow-flight-sql connection and mysql 
connection (#50939)
    
    ### What problem does this PR solve?
    
    Add FE conf `arrow_flight_max_connections` as the upper limit of
    arrow-flight-sql connection, arrow-flight-sql connections and mysql
    connections will be managed separately.
---
 .../main/java/org/apache/doris/common/Config.java  |  30 ++--
 .../org/apache/doris/mysql/AcceptListener.java     |   6 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |   2 +-
 .../{ConnectScheduler.java => ConnectPoolMgr.java} |  82 ++--------
 .../java/org/apache/doris/qe/ConnectScheduler.java | 174 +++++++--------------
 .../java/org/apache/doris/service/ExecuteEnv.java  |   2 +-
 .../arrowflight/DorisFlightSqlProducer.java        |  49 ++++--
 .../service/arrowflight/DorisFlightSqlService.java |  12 +-
 .../sessions/FlightSessionsManager.java            |   7 +
 .../sessions/FlightSessionsWithTokenManager.java   |  22 +--
 .../sessions/FlightSqlConnectContext.java          |  15 +-
 .../sessions/FlightSqlConnectPoolMgr.java          |  74 +++++++++
 .../arrowflight/tokens/FlightTokenManagerImpl.java |  18 ++-
 .../apache/doris/mysql/ConnectionExceedTest.java   |  31 ++--
 .../plans/commands/KillConnectionCommandTest.java  |   2 +-
 fe/pom.xml                                         |   2 +-
 samples/arrow-flight-sql/java/pom.xml              |   4 +-
 thirdparty/vars.sh                                 |   8 +-
 18 files changed, 284 insertions(+), 256 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3b57d64b4d8..2cf44f0cf3f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2821,21 +2821,23 @@ public class Config extends ConfigBase {
     })
     public static int autobucket_max_buckets = 128;
 
-    @ConfField(description = {"Arrow Flight 
Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, "
-            + "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, "
-            + "因为arrow flight sql是无状态的协议,连接通常不会主动断开,"
+    @ConfField(description = {"单个 FE 的 Arrow Flight Server 的最大连接数。",
+            "Maximal number of connections of Arrow Flight Server per FE."})
+    public static int arrow_flight_max_connections = 4096;
+
+    @ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow 
Flight Server中所有用户token的缓存上限,"
+            + "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开,"
             + "bearer token 从 cache 淘汰的同时会 unregister Connection.",
-            "The cache limit of all user tokens in Arrow Flight Server. which 
will be eliminated by"
-            + "LRU rules after exceeding the limit, the default value is 512, 
the mandatory limit is "
-            + "less than qe_max_connection/2 to avoid `Reach limit of 
connections`, "
-            + "because arrow flight sql is a stateless protocol, the 
connection is usually not actively "
-            + "disconnected, bearer token is evict from the cache will 
unregister ConnectContext."})
-    public static int arrow_flight_token_cache_size = 512;
-
-    @ConfField(description = {"Arrow Flight 
Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天",
-            "The alive time of the user token in Arrow Flight Server, expire 
after write, unit minutes,"
-            + "the default value is 4320, which is 3 days"})
-    public static int arrow_flight_token_alive_time = 4320;
+            "(Deprecated, replaced by arrow_flight_max_connection) The cache 
limit of all user tokens in "
+            + "Arrow Flight Server. which will be eliminated by LRU rules 
after exceeding the limit, "
+            + "arrow flight sql is a stateless protocol, the connection is 
usually not actively disconnected, "
+            + "bearer token is evict from the cache will unregister 
ConnectContext."})
+    public static int arrow_flight_token_cache_size = 4096;
+
+    @ConfField(description = {"Arrow Flight 
Server中用户token的存活时间,自上次写入后过期时间,单位秒,默认值为86400,即1天",
+            "The alive time of the user token in Arrow Flight Server, expire 
after write, unit second,"
+            + "the default value is 86400, which is 1 days"})
+    public static int arrow_flight_token_alive_time_second = 86400;
 
     @ConfField(mutable = true, description = {
             "Doris 为了兼用 mysql 周边工具生态,会内置一个名为 mysql 的数据库,如果该数据库与用户自建数据库冲突,"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 95e42bc4859..98ef3ba8937 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -108,16 +108,16 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
             if (!MysqlProto.negotiate(context)) {
                 throw new AfterConnectedException("mysql negotiate failed");
             }
-            int res = connectScheduler.registerConnection(context);
+            int res = 
connectScheduler.getConnectPoolMgr().registerConnection(context);
             if (res == -1) {
                 MysqlProto.sendResponsePacket(context);
                 connection.setCloseListener(
-                        streamConnection -> 
connectScheduler.unregisterConnection(context));
+                        streamConnection -> 
connectScheduler.getConnectPoolMgr().unregisterConnection(context));
             } else {
                 long userConnLimit = 
context.getEnv().getAuth().getMaxConn(context.getQualifiedUser());
                 String errMsg = String.format(
                         "Reach limit of connections. Total: %d, User: %d, 
Current: %d",
-                        connectScheduler.getMaxConnections(), userConnLimit, 
res);
+                        
connectScheduler.getConnectPoolMgr().getMaxConnections(), userConnLimit, res);
                 
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg);
                 MysqlProto.sendResponsePacket(context);
                 throw new AfterConnectedException(errMsg);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 569fab2cb18..50b825d1977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -940,7 +940,7 @@ public class ConnectContext {
         }
         this.queryId = queryId;
         if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
-            connectScheduler.putTraceId2QueryId(traceId, queryId);
+            connectScheduler.getConnectPoolMgr().putTraceId2QueryId(traceId, 
queryId);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
similarity index 61%
copy from fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
copy to fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
index 357d3e0779d..8566fcde6cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
@@ -20,10 +20,9 @@ package org.apache.doris.qe;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Status;
-import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.ConnectContext.ThreadInfo;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Lists;
@@ -35,60 +34,27 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.TimerTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-// The scheduler of query requests
-// Now the strategy is simple, we allocate a thread for it when a request 
comes.
-// TODO(zhaochun): We should consider if the number of local file connection 
can >= maximum connections later.
-public class ConnectScheduler {
-    private static final Logger LOG = 
LogManager.getLogger(ConnectScheduler.class);
-    private final int maxConnections;
-    private final AtomicInteger numberConnection;
-    private final AtomicInteger nextConnectionId;
-    private final Map<Integer, ConnectContext> connectionMap = 
Maps.newConcurrentMap();
-    private final Map<String, AtomicInteger> connByUser = 
Maps.newConcurrentMap();
-    private final Map<String, Integer> flightToken2ConnectionId = 
Maps.newConcurrentMap();
+public class ConnectPoolMgr {
+    private static final Logger LOG = 
LogManager.getLogger(ConnectPoolMgr.class);
+    protected final int maxConnections;
+    protected final AtomicInteger numberConnection;
+    protected final Map<Integer, ConnectContext> connectionMap = 
Maps.newConcurrentMap();
+    protected final Map<String, AtomicInteger> connByUser = 
Maps.newConcurrentMap();
 
     // valid trace id -> query id
-    private final Map<String, TUniqueId> traceId2QueryId = 
Maps.newConcurrentMap();
+    protected final Map<String, TUniqueId> traceId2QueryId = 
Maps.newConcurrentMap();
 
-    // Use a thread to check whether connection is timeout. Because
-    // 1. If use a scheduler, the task maybe a huge number when query is messy.
-    //    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks 
in scheduler.
-    // 2. Use a thread to poll maybe lose some accurate, but is enough to us.
-    private final ScheduledExecutorService checkTimer = 
ThreadPoolManager.newDaemonScheduledThreadPool(1,
-            "connect-scheduler-check-timer", true);
-
-    public ConnectScheduler(int maxConnections) {
+    public ConnectPoolMgr(int maxConnections) {
         this.maxConnections = maxConnections;
         numberConnection = new AtomicInteger(0);
-        nextConnectionId = new AtomicInteger(0);
-        checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, 
TimeUnit.MILLISECONDS);
-    }
-
-    private class TimeoutChecker extends TimerTask {
-        @Override
-        public void run() {
-            long now = System.currentTimeMillis();
-            for (ConnectContext connectContext : connectionMap.values()) {
-                connectContext.checkTimeout(now);
-            }
-        }
     }
 
-    // submit one MysqlContext or ArrowFlightSqlContext to this scheduler.
-    // return true, if this connection has been successfully submitted, 
otherwise return false.
-    // Caller should close ConnectContext if return false.
-    public boolean submit(ConnectContext context) {
-        if (context == null) {
-            return false;
+    public void timeoutChecker(long now) {
+        for (ConnectContext connectContext : connectionMap.values()) {
+            connectContext.checkTimeout(now);
         }
-        context.setConnectionId(nextConnectionId.getAndAdd(1));
-        context.resetLoginTime();
-        return true;
     }
 
     // Register one connection with its connection id.
@@ -108,9 +74,6 @@ public class ConnectScheduler {
             return numberConnection.get();
         }
         connectionMap.put(ctx.getConnectionId(), ctx);
-        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-            flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
-        }
         return -1;
     }
 
@@ -122,9 +85,6 @@ public class ConnectScheduler {
                 conns.decrementAndGet();
             }
             numberConnection.decrementAndGet();
-            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-                flightToken2ConnectionId.remove(ctx.getPeerIdentity());
-            }
         }
     }
 
@@ -141,29 +101,22 @@ public class ConnectScheduler {
         return null;
     }
 
-    public ConnectContext getContext(String flightToken) {
-        if (flightToken2ConnectionId.containsKey(flightToken)) {
-            int connectionId = flightToken2ConnectionId.get(flightToken);
-            return getContext(connectionId);
-        }
-        return null;
-    }
-
-    public void cancelQuery(String queryId, Status cancelReason) {
+    public boolean cancelQuery(String queryId, Status cancelReason) {
         for (ConnectContext ctx : connectionMap.values()) {
             TUniqueId qid = ctx.queryId();
             if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
                 ctx.cancelQuery(cancelReason);
-                break;
+                return true;
             }
         }
+        return false;
     }
 
     public int getConnectionNum() {
         return numberConnection.get();
     }
 
-    public List<ConnectContext.ThreadInfo> listConnection(String user, boolean 
isFull) {
+    public List<ThreadInfo> listConnection(String user, boolean isFull) {
         List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
         for (ConnectContext ctx : connectionMap.values()) {
             // Check auth
@@ -184,8 +137,7 @@ public class ConnectScheduler {
         long nowMs = System.currentTimeMillis();
         for (ConnectContext ctx : connectionMap.values()) {
             // Check auth
-            if (!ctx.getCurrentUserIdentity().equals(userIdentity) && 
!Env.getCurrentEnv()
-                    .getAccessManager()
+            if (!ctx.getCurrentUserIdentity().equals(userIdentity) && 
!Env.getCurrentEnv().getAccessManager()
                     .checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) {
                 continue;
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 357d3e0779d..c8d27a23db6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -18,14 +18,13 @@
 package org.apache.doris.qe;
 
 import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext.ConnectType;
-import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.qe.ConnectContext.ThreadInfo;
+import org.apache.doris.service.arrowflight.sessions.FlightSqlConnectPoolMgr;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -45,15 +44,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 // TODO(zhaochun): We should consider if the number of local file connection 
can >= maximum connections later.
 public class ConnectScheduler {
     private static final Logger LOG = 
LogManager.getLogger(ConnectScheduler.class);
-    private final int maxConnections;
-    private final AtomicInteger numberConnection;
     private final AtomicInteger nextConnectionId;
-    private final Map<Integer, ConnectContext> connectionMap = 
Maps.newConcurrentMap();
-    private final Map<String, AtomicInteger> connByUser = 
Maps.newConcurrentMap();
-    private final Map<String, Integer> flightToken2ConnectionId = 
Maps.newConcurrentMap();
-
-    // valid trace id -> query id
-    private final Map<String, TUniqueId> traceId2QueryId = 
Maps.newConcurrentMap();
+    private final ConnectPoolMgr connectPoolMgr;
+    private final FlightSqlConnectPoolMgr flightSqlConnectPoolMgr;
 
     // Use a thread to check whether connection is timeout. Because
     // 1. If use a scheduler, the task maybe a huge number when query is messy.
@@ -62,24 +55,26 @@ public class ConnectScheduler {
     private final ScheduledExecutorService checkTimer = 
ThreadPoolManager.newDaemonScheduledThreadPool(1,
             "connect-scheduler-check-timer", true);
 
-    public ConnectScheduler(int maxConnections) {
-        this.maxConnections = maxConnections;
-        numberConnection = new AtomicInteger(0);
+    public ConnectScheduler(int commonMaxConnections, int 
flightSqlMaxConnections) {
         nextConnectionId = new AtomicInteger(0);
+        this.connectPoolMgr = new ConnectPoolMgr(commonMaxConnections);
+        this.flightSqlConnectPoolMgr = new 
FlightSqlConnectPoolMgr(flightSqlMaxConnections);
         checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, 
TimeUnit.MILLISECONDS);
     }
 
-    private class TimeoutChecker extends TimerTask {
-        @Override
-        public void run() {
-            long now = System.currentTimeMillis();
-            for (ConnectContext connectContext : connectionMap.values()) {
-                connectContext.checkTimeout(now);
-            }
-        }
+    public ConnectScheduler(int commonMaxConnections) {
+        this(commonMaxConnections, Config.arrow_flight_max_connections);
+    }
+
+    public ConnectPoolMgr getConnectPoolMgr() {
+        return connectPoolMgr;
     }
 
-    // submit one MysqlContext or ArrowFlightSqlContext to this scheduler.
+    public FlightSqlConnectPoolMgr getFlightSqlConnectPoolMgr() {
+        return flightSqlConnectPoolMgr;
+    }
+
+    // submit one MysqlContext to this scheduler.
     // return true, if this connection has been successfully submitted, 
otherwise return false.
     // Caller should close ConnectContext if return false.
     public boolean submit(ConnectContext context) {
@@ -91,89 +86,38 @@ public class ConnectScheduler {
         return true;
     }
 
-    // Register one connection with its connection id.
-    // Return -1 means register OK
-    // Return >=0 means register failed, and return value is current 
connection num.
-    public int registerConnection(ConnectContext ctx) {
-        if (numberConnection.incrementAndGet() > maxConnections) {
-            numberConnection.decrementAndGet();
-            return numberConnection.get();
-        }
-        // Check user
-        connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
-        AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
-        if (conns.incrementAndGet() > 
ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
-            conns.decrementAndGet();
-            numberConnection.decrementAndGet();
-            return numberConnection.get();
-        }
-        connectionMap.put(ctx.getConnectionId(), ctx);
-        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-            flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
-        }
-        return -1;
-    }
-
-    public void unregisterConnection(ConnectContext ctx) {
-        ctx.closeTxn();
-        if (connectionMap.remove(ctx.getConnectionId()) != null) {
-            AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
-            if (conns != null) {
-                conns.decrementAndGet();
-            }
-            numberConnection.decrementAndGet();
-            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-                flightToken2ConnectionId.remove(ctx.getPeerIdentity());
-            }
-        }
-    }
-
     public ConnectContext getContext(int connectionId) {
-        return connectionMap.get(connectionId);
-    }
-
-    public ConnectContext getContextWithQueryId(String queryId) {
-        for (ConnectContext context : connectionMap.values()) {
-            if (queryId.equals(DebugUtil.printId(context.queryId))) {
-                return context;
-            }
+        ConnectContext ctx = connectPoolMgr.getContext(connectionId);
+        if (ctx == null) {
+            ctx = flightSqlConnectPoolMgr.getContext(connectionId);
         }
-        return null;
+        return ctx;
     }
 
-    public ConnectContext getContext(String flightToken) {
-        if (flightToken2ConnectionId.containsKey(flightToken)) {
-            int connectionId = flightToken2ConnectionId.get(flightToken);
-            return getContext(connectionId);
+    public ConnectContext getContextWithQueryId(String queryId) {
+        ConnectContext ctx = connectPoolMgr.getContextWithQueryId(queryId);
+        if (ctx == null) {
+            ctx = flightSqlConnectPoolMgr.getContextWithQueryId(queryId);
         }
-        return null;
+        return ctx;
     }
 
-    public void cancelQuery(String queryId, Status cancelReason) {
-        for (ConnectContext ctx : connectionMap.values()) {
-            TUniqueId qid = ctx.queryId();
-            if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
-                ctx.cancelQuery(cancelReason);
-                break;
-            }
+    public boolean cancelQuery(String queryId, Status cancelReason) {
+        boolean ret = connectPoolMgr.cancelQuery(queryId, cancelReason);
+        if (!ret) {
+            ret = flightSqlConnectPoolMgr.cancelQuery(queryId, cancelReason);
         }
+        return ret;
     }
 
     public int getConnectionNum() {
-        return numberConnection.get();
+        return connectPoolMgr.getConnectionNum() + 
flightSqlConnectPoolMgr.getConnectionNum();
     }
 
-    public List<ConnectContext.ThreadInfo> listConnection(String user, boolean 
isFull) {
+    public List<ThreadInfo> listConnection(String user, boolean isFull) {
         List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
-        for (ConnectContext ctx : connectionMap.values()) {
-            // Check auth
-            if (!ctx.getQualifiedUser().equals(user) && 
!Env.getCurrentEnv().getAccessManager()
-                    .checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-                continue;
-            }
-
-            infos.add(ctx.toThreadInfo(isFull));
-        }
+        infos.addAll(connectPoolMgr.listConnection(user, isFull));
+        infos.addAll(flightSqlConnectPoolMgr.listConnection(user, isFull));
         return infos;
     }
 
@@ -181,37 +125,39 @@ public class ConnectScheduler {
     public List<List<String>> listConnectionForRpc(UserIdentity userIdentity, 
boolean isShowFullSql,
             Optional<String> timeZone) {
         List<List<String>> list = new ArrayList<>();
-        long nowMs = System.currentTimeMillis();
-        for (ConnectContext ctx : connectionMap.values()) {
-            // Check auth
-            if (!ctx.getCurrentUserIdentity().equals(userIdentity) && 
!Env.getCurrentEnv()
-                    .getAccessManager()
-                    .checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) {
-                continue;
-            }
-            list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, 
timeZone));
-        }
+        list.addAll(connectPoolMgr.listConnectionForRpc(userIdentity, 
isShowFullSql, timeZone));
+        list.addAll(flightSqlConnectPoolMgr.listConnectionForRpc(userIdentity, 
isShowFullSql, timeZone));
         return list;
     }
 
-    public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
-        traceId2QueryId.put(traceId, queryId);
-    }
-
     public String getQueryIdByTraceId(String traceId) {
-        TUniqueId queryId = traceId2QueryId.get(traceId);
-        return queryId == null ? "" : DebugUtil.printId(queryId);
+        String queryId = connectPoolMgr.getQueryIdByTraceId(traceId);
+        if (Strings.isNullOrEmpty(queryId)) {
+            queryId = flightSqlConnectPoolMgr.getQueryIdByTraceId(traceId);
+        }
+        return queryId;
     }
 
     public Map<Integer, ConnectContext> getConnectionMap() {
-        return connectionMap;
+        Map<Integer, ConnectContext> map = Maps.newConcurrentMap();
+        map.putAll(connectPoolMgr.getConnectionMap());
+        map.putAll(flightSqlConnectPoolMgr.getConnectionMap());
+        return map;
     }
 
     public Map<String, AtomicInteger> getUserConnectionMap() {
-        return connByUser;
+        Map<String, AtomicInteger> map = Maps.newConcurrentMap();
+        map.putAll(connectPoolMgr.getUserConnectionMap());
+        map.putAll(flightSqlConnectPoolMgr.getUserConnectionMap());
+        return map;
     }
 
-    public int getMaxConnections() {
-        return maxConnections;
+    private class TimeoutChecker extends TimerTask {
+        @Override
+        public void run() {
+            long now = System.currentTimeMillis();
+            connectPoolMgr.timeoutChecker(now);
+            flightSqlConnectPoolMgr.timeoutChecker(now);
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
index 2064bfb267f..b84fb40af1c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
@@ -39,7 +39,7 @@ public class ExecuteEnv {
 
     private ExecuteEnv() {
         multiLoadMgr = new MultiLoadMgr();
-        scheduler = new ConnectScheduler(Config.qe_max_connection);
+        scheduler = new ConnectScheduler(Config.qe_max_connection, 
Config.arrow_flight_max_connections);
         startupTime = System.currentTimeMillis();
         processUUID = System.currentTimeMillis();
         String logDir = Strings.isNullOrEmpty(Config.sys_log_dir) ? 
System.getenv("LOG_DIR") :
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 154fd9f0b6b..063bc3d46c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -36,6 +36,8 @@ import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.CloseSessionRequest;
+import org.apache.arrow.flight.CloseSessionResult;
 import org.apache.arrow.flight.Criteria;
 import org.apache.arrow.flight.FlightDescriptor;
 import org.apache.arrow.flight.FlightEndpoint;
@@ -110,7 +112,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         this.flightSessionsManager = flightSessionsManager;
         sqlInfoBuilder = new SqlInfoBuilder();
         
sqlInfoBuilder.withFlightSqlServerName("DorisFE").withFlightSqlServerVersion("1.0")
-                
.withFlightSqlServerArrowVersion("13.0").withFlightSqlServerReadOnly(false)
+                
.withFlightSqlServerArrowVersion("18.2.0").withFlightSqlServerReadOnly(false)
                 
.withSqlIdentifierQuoteChar("`").withSqlDdlCatalog(true).withSqlDdlSchema(false).withSqlDdlTable(false)
                 
.withSqlIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE)
                 
.withSqlQuotedIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE);
@@ -139,7 +141,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             final VectorSchemaRoot vectorSchemaRoot = 
flightSqlResultCacheEntry.getVectorSchemaRoot();
             listener.start(vectorSchemaRoot);
             listener.putNext();
-        } catch (Exception e) {
+        } catch (Throwable e) {
             String errMsg = "get stream statement failed, " + e.getMessage() + 
", " + Util.getRootCauseMessage(e)
                     + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
                     + connectContext.getState().getErrorMessage();
@@ -172,7 +174,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 String executedPeerIdentity = handleParts[0];
                 String preparedStatementId = handleParts[1];
                 
flightSessionsManager.getConnectContext(executedPeerIdentity).removePreparedQuery(preparedStatementId);
-            } catch (final Exception e) {
+            } catch (final Throwable e) {
                 listener.onError(e);
                 return;
             }
@@ -274,7 +276,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                     return new 
FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, 
-1, -1);
                 }
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             String errMsg = "get flight info statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(e)
                     + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
                     + connectContext.getState().getErrorMessage();
@@ -288,8 +290,14 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     @Override
     public FlightInfo getFlightInfoStatement(final CommandStatementQuery 
request, final CallContext context,
             final FlightDescriptor descriptor) {
-        ConnectContext connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
-        return executeQueryStatement(context.peerIdentity(), connectContext, 
request.getQuery(), descriptor);
+        try {
+            ConnectContext connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
+            return executeQueryStatement(context.peerIdentity(), 
connectContext, request.getQuery(), descriptor);
+        } catch (Throwable e) {
+            String errMsg = "get flight info statement failed, " + 
e.getMessage();
+            LOG.error(errMsg, e);
+            throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
+        }
     }
 
     @Override
@@ -402,7 +410,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                     }
                 }
                 ackStream.onCompleted();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 String errMsg = "acceptPutPreparedStatementUpdate failed, " + 
e.getMessage() + ", "
                         + Util.getRootCauseMessage(e);
                 LOG.error(errMsg, e);
@@ -461,7 +469,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 listener.putNext();
                 listener.completed();
             }
-        } catch (final Exception e) {
+        } catch (final Throwable e) {
             handleStreamException(e, "", listener);
         }
     }
@@ -488,7 +496,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 listener.putNext();
                 listener.completed();
             }
-        } catch (final Exception e) {
+        } catch (final Throwable e) {
             handleStreamException(e, "", listener);
         }
     }
@@ -520,7 +528,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 listener.putNext();
                 listener.completed();
             }
-        } catch (final Exception e) {
+        } catch (final Throwable e) {
             handleStreamException(e, "", listener);
         }
     }
@@ -584,6 +592,25 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         throw 
CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference 
unimplemented").toRuntimeException();
     }
 
+    @Override
+    public void closeSession(CloseSessionRequest request, final CallContext 
context,
+            final StreamListener<CloseSessionResult> listener) {
+        // https://github.com/apache/arrow-adbc/issues/2821
+        // currently FlightSqlConnection does not provide a separate interface 
for external calls to
+        // FlightSqlClient::closeSession(), nor will it automatically call 
closeSession
+        // when FlightSqlConnection::close(). Python flight sql Cursor.close() 
will call closeSession().
+        // Neither C++ nor Java seem to have similar behavior.
+        try {
+            flightSessionsManager.closeConnectContext(context.peerIdentity());
+        } catch (final Throwable e) {
+            LOG.error("closeSession failed", e);
+            listener.onError(
+                    CallStatus.INTERNAL.withDescription("closeSession 
failed").withCause(e).toRuntimeException());
+        }
+        listener.onNext(new 
CloseSessionResult(CloseSessionResult.Status.CLOSED));
+        listener.onCompleted();
+    }
+
     private <T extends Message> FlightInfo getFlightInfoForSchema(final T 
request, final FlightDescriptor descriptor,
             final Schema schema) {
         final Ticket ticket = new Ticket(Any.pack(request).toByteArray());
@@ -592,7 +619,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         return new FlightInfo(schema, descriptor, endpoints, -1, -1);
     }
 
-    private static void handleStreamException(Exception e, String errMsg, 
ServerStreamListener listener) {
+    private static void handleStreamException(Throwable e, String errMsg, 
ServerStreamListener listener) {
         LOG.error(errMsg, e);
         
listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
         throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
index b83936dab3b..5c47941b291 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
@@ -46,22 +46,20 @@ public class DorisFlightSqlService {
 
     public DorisFlightSqlService(int port) {
         BufferAllocator allocator = new RootAllocator();
-        // arrow_flight_token_cache_size less than qe_max_connection to avoid 
`Reach limit of connections`.
         // arrow flight sql is a stateless protocol, connection is usually not 
actively disconnected.
         // bearer token is evict from the cache will unregister ConnectContext.
         this.flightTokenManager = new FlightTokenManagerImpl(
-                Math.min(Config.arrow_flight_token_cache_size, 
Config.qe_max_connection / 2),
-                Config.arrow_flight_token_alive_time);
+                Math.min(Config.arrow_flight_max_connections, 
Config.arrow_flight_token_cache_size),
+                Config.arrow_flight_token_alive_time_second);
         this.flightSessionsManager = new 
FlightSessionsWithTokenManager(flightTokenManager);
 
         DorisFlightSqlProducer producer = new DorisFlightSqlProducer(
                 
Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port), 
flightSessionsManager);
         flightServer = FlightServer.builder(allocator, 
Location.forGrpcInsecure("0.0.0.0", port), producer)
                 .headerAuthenticator(new 
FlightBearerTokenAuthenticator(flightTokenManager)).build();
-        LOG.info("Arrow Flight SQL service is created, port: {}, 
token_cache_size: {}"
-                        + ", qe_max_connection: {}, token_alive_time: {}",
-                port, Config.arrow_flight_token_cache_size, 
Config.qe_max_connection,
-                Config.arrow_flight_token_alive_time);
+        LOG.info("Arrow Flight SQL service is created, port: {}, 
arrow_flight_max_connections: {},"
+                        + "arrow_flight_token_alive_time_second: {}", port, 
Config.arrow_flight_max_connections,
+                Config.arrow_flight_token_alive_time_second);
     }
 
     // start Arrow Flight SQL service, return true if success, otherwise false
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
index 2c3a5258cc4..4e9571d63ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java
@@ -45,6 +45,13 @@ public interface FlightSessionsManager {
      */
     ConnectContext createConnectContext(String peerIdentity);
 
+    /**
+     * Close ConnectContext object and delete it in the local cache.
+     *
+     * @param peerIdentity identity after authorization
+     */
+    void closeConnectContext(String peerIdentity);
+
     static ConnectContext buildConnectContext(String peerIdentity, 
UserIdentity userIdentity, String remoteIP) {
         ConnectContext connectContext = new 
FlightSqlConnectContext(peerIdentity);
         connectContext.setEnv(Env.getCurrentEnv());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
index a495b02c393..8001998a66e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
@@ -41,7 +41,8 @@ public class FlightSessionsWithTokenManager implements 
FlightSessionsManager {
     @Override
     public ConnectContext getConnectContext(String peerIdentity) {
         try {
-            ConnectContext connectContext = 
ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
+            ConnectContext connectContext = 
ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr()
+                    .getContextWithFlightToken(peerIdentity);
             if (null == connectContext) {
                 connectContext = createConnectContext(peerIdentity);
                 return connectContext;
@@ -68,18 +69,21 @@ public class FlightSessionsWithTokenManager implements 
FlightSessionsManager {
                 flightTokenDetails.getUserIdentity(), 
flightTokenDetails.getRemoteIp());
         ConnectScheduler connectScheduler = 
ExecuteEnv.getInstance().getScheduler();
         connectScheduler.submit(connectContext);
-        int res = connectScheduler.registerConnection(connectContext);
+        int res = 
connectScheduler.getFlightSqlConnectPoolMgr().registerConnection(connectContext);
         if (res >= 0) {
-            long userConnLimit = 
connectContext.getEnv().getAuth().getMaxConn(connectContext.getQualifiedUser());
             String errMsg = String.format(
-                    "Reach limit of connections. Total: %d, User: %d, Current: 
%d. "
-                            + "Increase `qe_max_connection` in fe.conf or 
user's `max_user_connections`,"
-                            + " or decrease `arrow_flight_token_cache_size` "
-                            + "to evict unused bearer tokens and it 
connections faster",
-                    connectScheduler.getMaxConnections(), userConnLimit, res);
-            
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, 
errMsg);
+                    "Register arrow flight sql connection failed, Unknown 
Error, the number of arrow flight "
+                            + "bearer tokens should be equal to arrow flight 
sql max connections, "
+                            + "max connections: %d, used: %d.",
+                    
connectScheduler.getFlightSqlConnectPoolMgr().getMaxConnections(), res);
+            connectContext.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
errMsg);
             throw new IllegalArgumentException(errMsg);
         }
         return connectContext;
     }
+
+    @Override
+    public void closeConnectContext(String peerIdentity) {
+        flightTokenManager.invalidateToken(peerIdentity);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index b90d7505923..35293273f1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -24,7 +24,9 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
 import org.apache.doris.thrift.TResultSinkType;
 import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -63,7 +65,7 @@ public class FlightSqlConnectContext extends ConnectContext {
         if (flightSqlChannel != null) {
             flightSqlChannel.close();
         }
-        connectScheduler.unregisterConnection(this);
+        
connectScheduler.getFlightSqlConnectPoolMgr().unregisterConnection(this);
     }
 
     // kill operation with no protect.
@@ -80,6 +82,17 @@ public class FlightSqlConnectContext extends ConnectContext {
         cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query 
killed by user"));
     }
 
+    @Override
+    public void setQueryId(TUniqueId queryId) {
+        if (this.queryId != null) {
+            this.lastQueryId = this.queryId.deepCopy();
+        }
+        this.queryId = queryId;
+        if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
+            
connectScheduler.getFlightSqlConnectPoolMgr().putTraceId2QueryId(traceId, 
queryId);
+        }
+    }
+
     @Override
     public String getRemoteHostPortString() {
         return getFlightSqlChannel().getRemoteHostPortString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java
new file mode 100644
index 00000000000..3002116b386
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.sessions;
+
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.ConnectPoolMgr;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class FlightSqlConnectPoolMgr extends ConnectPoolMgr {
+    private static final Logger LOG = LogManager.getLogger(
+            FlightSqlConnectPoolMgr.class);
+    private final Map<String, Integer> flightToken2ConnectionId = 
Maps.newConcurrentMap();
+
+    public FlightSqlConnectPoolMgr(int maxConnections) {
+        super(maxConnections);
+    }
+
+    // Register one connection with its connection id.
+    // Return -1 means register OK
+    // Return >=0 means register failed, and return value is current 
connection num.
+    @Override
+    public int registerConnection(ConnectContext ctx) {
+        if (numberConnection.incrementAndGet() > maxConnections) {
+            numberConnection.decrementAndGet();
+            return numberConnection.get();
+        }
+        // not check user
+        connectionMap.put(ctx.getConnectionId(), ctx);
+        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
+            flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
+        }
+        return -1;
+    }
+
+    @Override
+    public void unregisterConnection(ConnectContext ctx) {
+        ctx.closeTxn();
+        if (connectionMap.remove(ctx.getConnectionId()) != null) {
+            numberConnection.decrementAndGet();
+            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
+                flightToken2ConnectionId.remove(ctx.getPeerIdentity());
+            }
+        }
+    }
+
+    public ConnectContext getContextWithFlightToken(String flightToken) {
+        if (flightToken2ConnectionId.containsKey(flightToken)) {
+            int connectionId = flightToken2ConnectionId.get(flightToken);
+            return getContext(connectionId);
+        }
+        return null;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
index bd3e85cd0c2..85d1a0bce35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
@@ -59,20 +59,26 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
     private ScheduledExecutorService cleanupExecutor;
 
     public FlightTokenManagerImpl(final int cacheSize, final int 
cacheExpiration) {
+        // The cache size of all user tokens in Arrow Flight Server. which 
will be eliminated by
+        // LRU rules after exceeding the limit, the default value is 
arrow_flight_max_connections,
+        // arrow flight sql is a stateless protocol, the connection is usually 
not actively
+        // disconnected, bearer token is evict from the cache will unregister 
ConnectContext.
         this.cacheSize = cacheSize;
         this.cacheExpiration = cacheExpiration;
 
         this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize)
-                .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
+                .expireAfterWrite(cacheExpiration, TimeUnit.SECONDS)
                 .removalListener(new RemovalListener<String, 
FlightTokenDetails>() {
                     @Override
                     public void onRemoval(@NotNull RemovalNotification<String, 
FlightTokenDetails> notification) {
                         // TODO: broadcast this message to other FE
                         String token = notification.getKey();
                         FlightTokenDetails tokenDetails = 
notification.getValue();
-                        ConnectContext context = 
ExecuteEnv.getInstance().getScheduler().getContext(token);
+                        ConnectContext context = 
ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr()
+                                .getContextWithFlightToken(token);
                         if (context != null) {
-                            
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
+                            
ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr()
+                                    .unregisterConnection(context);
                             LOG.info("evict bearer token: " + token + " from 
tokenCache, reason: "
                                     + notification.getCause()
                                     + ", and unregister flight connection 
context after evict bearer token");
@@ -145,13 +151,13 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
         if (value.getToken().equals("")) {
             throw new IllegalArgumentException("invalid bearer token: " + token
                     + ", try reconnect, bearer token may not be created, or 
may have been evict, search for this "
-                    + "token in fe.log to see the evict reason. currently in 
fe.conf, `arrow_flight_token_cache_size`="
-                    + this.cacheSize + ", `arrow_flight_token_alive_time`=" + 
this.cacheExpiration);
+                    + "token in fe.log to see the evict reason. currently in 
fe.conf, `arrow_flight_max_connections`="
+                    + this.cacheSize + ", 
`arrow_flight_token_alive_time_second`=" + this.cacheExpiration);
         }
         if (System.currentTimeMillis() >= value.getExpiresAt()) {
             tokenCache.invalidate(token);
             throw new IllegalArgumentException("bearer token expired: " + 
token + ", try reconnect, "
-                    + "currently in fe.conf, `arrow_flight_token_alive_time`=" 
+ this.cacheExpiration);
+                    + "currently in fe.conf, 
`arrow_flight_token_alive_time_second`=" + this.cacheExpiration);
         }
         if (usersTokenLRU.containsKey(value.getUsername())) {
             try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java
index 781858148af..544cf114f77 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java
@@ -78,14 +78,14 @@ public class ConnectionExceedTest {
         context1.setEnv(mockEnv);
         context1.setQualifiedUser("test_user");
         Assert.assertTrue(scheduler.submit(context1));
-        Assert.assertEquals(-1, scheduler.registerConnection(context1));
+        Assert.assertEquals(-1, 
scheduler.getConnectPoolMgr().registerConnection(context1));
 
         // Create second context and register
         ConnectContext context2 = new ConnectContext();
         context2.setEnv(mockEnv);
         context2.setQualifiedUser("test_user");
         Assert.assertTrue(scheduler.submit(context2));
-        Assert.assertEquals(-1, scheduler.registerConnection(context2));
+        Assert.assertEquals(-1, 
scheduler.getConnectPoolMgr().registerConnection(context2));
 
         // Create third context and try to register - should fail
         ConnectContext context3 = new ConnectContext();
@@ -98,7 +98,7 @@ public class ConnectionExceedTest {
         listener.handleConnection(context3, mockConnection);
         String expectedMsg = String.format(
                 "Reach limit of connections. Total: %d, User: %d, Current: %d",
-                scheduler.getMaxConnections(),
+                scheduler.getConnectPoolMgr().getMaxConnections(),
                 2, // Mocked user connection limit
                 scheduler.getConnectionNum());
         Assert.assertEquals(expectedMsg, 
context3.getState().getErrorMessage());
@@ -108,15 +108,16 @@ public class ConnectionExceedTest {
     @Test
     public void testFlightSessionConnectionExceed() throws Exception {
         // Create a scheduler with small max connections
-        ConnectScheduler scheduler = new ConnectScheduler(2);
+        ConnectScheduler scheduler = new ConnectScheduler(1000, 2);
 
         // Setup expectations
         new Expectations() {
             {
-                mockEnv.getAuth();
-                result = mockAuth;
-                mockAuth.getMaxConn("test_user");
-                result = 2;
+                // Arrow flight sql not check the number of user connections.
+                // mockEnv.getAuth();
+                // result = mockAuth;
+                // mockAuth.getMaxConn("test_user");
+                // result = 2;
 
                 mockExecuteEnv.getScheduler();
                 result = scheduler;
@@ -140,14 +141,14 @@ public class ConnectionExceedTest {
         context1.setEnv(mockEnv);
         context1.setQualifiedUser("test_user");
         Assert.assertTrue(scheduler.submit(context1));
-        Assert.assertEquals(-1, scheduler.registerConnection(context1));
+        Assert.assertEquals(-1, 
scheduler.getFlightSqlConnectPoolMgr().registerConnection(context1));
 
         // Create second context and register
         ConnectContext context2 = new ConnectContext();
         context2.setEnv(mockEnv);
         context2.setQualifiedUser("test_user");
         Assert.assertTrue(scheduler.submit(context2));
-        Assert.assertEquals(-1, scheduler.registerConnection(context2));
+        Assert.assertEquals(-1, 
scheduler.getFlightSqlConnectPoolMgr().registerConnection(context2));
 
         // Create FlightSessionsWithTokenManager and try to create a new 
connection
         FlightSessionsWithTokenManager manager = new 
FlightSessionsWithTokenManager(mockTokenManager);
@@ -157,12 +158,10 @@ public class ConnectionExceedTest {
         } catch (IllegalArgumentException e) {
             // Verify error message is set correctly
             String expectedMsg = String.format(
-                    "Reach limit of connections. Total: %d, User: %d, Current: 
%d. "
-                            + "Increase `qe_max_connection` in fe.conf or 
user's `max_user_connections`,"
-                            + " or decrease `arrow_flight_token_cache_size` "
-                            + "to evict unused bearer tokens and it 
connections faster",
-                    scheduler.getMaxConnections(),
-                    2, // Mocked user connection limit
+                    "Register arrow flight sql connection failed, Unknown 
Error, the number of arrow flight "
+                            + "bearer tokens should be equal to arrow flight 
sql max connections, "
+                            + "max connections: %d, used: %d.",
+                    scheduler.getFlightSqlConnectPoolMgr().getMaxConnections(),
                     scheduler.getConnectionNum());
             Assert.assertEquals(expectedMsg, e.getMessage());
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java
index 6145e46bd5b..190e546dc3c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java
@@ -56,7 +56,7 @@ public class KillConnectionCommandTest extends 
TestWithFeService {
             }
         };
         connectContext.setConnectScheduler(scheduler);
-        scheduler.registerConnection(connectContext);
+        scheduler.getConnectPoolMgr().registerConnection(connectContext);
     }
 
     @Test
diff --git a/fe/pom.xml b/fe/pom.xml
index 582d8b2d76e..14ec4bc07bd 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -321,7 +321,7 @@ under the License.
          you can find avro version info in iceberg mvn repository -->
         <iceberg.version>1.6.1</iceberg.version>
         <maxcompute.version>0.49.0-public</maxcompute.version>
-        <arrow.version>17.0.0</arrow.version>
+        <arrow.version>18.2.0</arrow.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
         <!-- lakesoul -->
diff --git a/samples/arrow-flight-sql/java/pom.xml 
b/samples/arrow-flight-sql/java/pom.xml
index d08e30d69ae..f7f73cd92d5 100644
--- a/samples/arrow-flight-sql/java/pom.xml
+++ b/samples/arrow-flight-sql/java/pom.xml
@@ -48,8 +48,8 @@ under the License.
                similar issue: 
https://github.com/protocolbuffers/protobuf/issues/15762
            3. A more stable version is Arrow 15.0.2 and ADBC 0.12.0, but we 
always hope to embrace the future with new versions!
         -->
-        <arrow.version>18.1.0</arrow.version>
-        <adbc.version>0.15.0</adbc.version>
+        <arrow.version>18.2.0</arrow.version>
+        <adbc.version>0.18.0</adbc.version>
         <log4j.version>2.17.1</log4j.version>
     </properties>
     <dependencies>
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index ee7df5c1862..51705a0bd35 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -253,10 +253,10 @@ GRPC_SOURCE=grpc-1.54.3
 GRPC_MD5SUM="af00a2edeae0f02bb25917cc3473b7de"
 
 # arrow
-ARROW_DOWNLOAD="https://github.com/apache/arrow/archive/refs/tags/apache-arrow-17.0.0.tar.gz";
-ARROW_NAME="apache-arrow-17.0.0.tar.gz"
-ARROW_SOURCE="arrow-apache-arrow-17.0.0"
-ARROW_MD5SUM="ba18bf83e2164abd34b9ac4cb164f0f0"
+ARROW_DOWNLOAD="https://github.com/apache/arrow/archive/refs/tags/apache-arrow-19.0.1.tar.gz";
+ARROW_NAME="apache-arrow-19.0.1.tar.gz"
+ARROW_SOURCE="arrow-apache-arrow-19.0.1"
+ARROW_MD5SUM="8c5091da0f8fb41a47d7f4dad7b712df"
 
 # Abseil
 
ABSEIL_DOWNLOAD="https://github.com/abseil/abseil-cpp/archive/refs/tags/20230125.3.tar.gz";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to