This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 34e4bec7fce [fix](arrow-flight-sql) Separate arrow-flight-sql connection and mysql connection (#50939) 34e4bec7fce is described below commit 34e4bec7fcebcc7d8f9c847372825712e141a002 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Mon May 19 12:03:37 2025 +0800 [fix](arrow-flight-sql) Separate arrow-flight-sql connection and mysql connection (#50939) ### What problem does this PR solve? Add FE conf `arrow_flight_max_connections` as the upper limit of arrow-flight-sql connection, arrow-flight-sql connections and mysql connections will be managed separately. --- .../main/java/org/apache/doris/common/Config.java | 30 ++-- .../org/apache/doris/mysql/AcceptListener.java | 6 +- .../java/org/apache/doris/qe/ConnectContext.java | 2 +- .../{ConnectScheduler.java => ConnectPoolMgr.java} | 82 ++-------- .../java/org/apache/doris/qe/ConnectScheduler.java | 174 +++++++-------------- .../java/org/apache/doris/service/ExecuteEnv.java | 2 +- .../arrowflight/DorisFlightSqlProducer.java | 49 ++++-- .../service/arrowflight/DorisFlightSqlService.java | 12 +- .../sessions/FlightSessionsManager.java | 7 + .../sessions/FlightSessionsWithTokenManager.java | 22 +-- .../sessions/FlightSqlConnectContext.java | 15 +- .../sessions/FlightSqlConnectPoolMgr.java | 74 +++++++++ .../arrowflight/tokens/FlightTokenManagerImpl.java | 18 ++- .../apache/doris/mysql/ConnectionExceedTest.java | 31 ++-- .../plans/commands/KillConnectionCommandTest.java | 2 +- fe/pom.xml | 2 +- samples/arrow-flight-sql/java/pom.xml | 4 +- thirdparty/vars.sh | 8 +- 18 files changed, 284 insertions(+), 256 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3b57d64b4d8..2cf44f0cf3f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2821,21 +2821,23 @@ public class Config extends ConfigBase { }) public static int autobucket_max_buckets = 128; - @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, " - + "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, " - + "因为arrow flight sql是无状态的协议,连接通常不会主动断开," + @ConfField(description = {"单个 FE 的 Arrow Flight Server 的最大连接数。", + "Maximal number of connections of Arrow Flight Server per FE."}) + public static int arrow_flight_max_connections = 4096; + + @ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow Flight Server中所有用户token的缓存上限," + + "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开," + "bearer token 从 cache 淘汰的同时会 unregister Connection.", - "The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by" - + "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is " - + "less than qe_max_connection/2 to avoid `Reach limit of connections`, " - + "because arrow flight sql is a stateless protocol, the connection is usually not actively " - + "disconnected, bearer token is evict from the cache will unregister ConnectContext."}) - public static int arrow_flight_token_cache_size = 512; - - @ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天", - "The alive time of the user token in Arrow Flight Server, expire after write, unit minutes," - + "the default value is 4320, which is 3 days"}) - public static int arrow_flight_token_alive_time = 4320; + "(Deprecated, replaced by arrow_flight_max_connection) The cache limit of all user tokens in " + + "Arrow Flight Server. which will be eliminated by LRU rules after exceeding the limit, " + + "arrow flight sql is a stateless protocol, the connection is usually not actively disconnected, " + + "bearer token is evict from the cache will unregister ConnectContext."}) + public static int arrow_flight_token_cache_size = 4096; + + @ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位秒,默认值为86400,即1天", + "The alive time of the user token in Arrow Flight Server, expire after write, unit second," + + "the default value is 86400, which is 1 days"}) + public static int arrow_flight_token_alive_time_second = 86400; @ConfField(mutable = true, description = { "Doris 为了兼用 mysql 周边工具生态,会内置一个名为 mysql 的数据库,如果该数据库与用户自建数据库冲突," diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 95e42bc4859..98ef3ba8937 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -108,16 +108,16 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo if (!MysqlProto.negotiate(context)) { throw new AfterConnectedException("mysql negotiate failed"); } - int res = connectScheduler.registerConnection(context); + int res = connectScheduler.getConnectPoolMgr().registerConnection(context); if (res == -1) { MysqlProto.sendResponsePacket(context); connection.setCloseListener( - streamConnection -> connectScheduler.unregisterConnection(context)); + streamConnection -> connectScheduler.getConnectPoolMgr().unregisterConnection(context)); } else { long userConnLimit = context.getEnv().getAuth().getMaxConn(context.getQualifiedUser()); String errMsg = String.format( "Reach limit of connections. Total: %d, User: %d, Current: %d", - connectScheduler.getMaxConnections(), userConnLimit, res); + connectScheduler.getConnectPoolMgr().getMaxConnections(), userConnLimit, res); context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg); MysqlProto.sendResponsePacket(context); throw new AfterConnectedException(errMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 569fab2cb18..50b825d1977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -940,7 +940,7 @@ public class ConnectContext { } this.queryId = queryId; if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) { - connectScheduler.putTraceId2QueryId(traceId, queryId); + connectScheduler.getConnectPoolMgr().putTraceId2QueryId(traceId, queryId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java similarity index 61% copy from fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java copy to fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java index 357d3e0779d..8566fcde6cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java @@ -20,10 +20,9 @@ package org.apache.doris.qe; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; import org.apache.doris.common.Status; -import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.ConnectContext.ThreadInfo; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -35,60 +34,27 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.TimerTask; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -// The scheduler of query requests -// Now the strategy is simple, we allocate a thread for it when a request comes. -// TODO(zhaochun): We should consider if the number of local file connection can >= maximum connections later. -public class ConnectScheduler { - private static final Logger LOG = LogManager.getLogger(ConnectScheduler.class); - private final int maxConnections; - private final AtomicInteger numberConnection; - private final AtomicInteger nextConnectionId; - private final Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap(); - private final Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap(); - private final Map<String, Integer> flightToken2ConnectionId = Maps.newConcurrentMap(); +public class ConnectPoolMgr { + private static final Logger LOG = LogManager.getLogger(ConnectPoolMgr.class); + protected final int maxConnections; + protected final AtomicInteger numberConnection; + protected final Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap(); + protected final Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap(); // valid trace id -> query id - private final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap(); + protected final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap(); - // Use a thread to check whether connection is timeout. Because - // 1. If use a scheduler, the task maybe a huge number when query is messy. - // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. - // 2. Use a thread to poll maybe lose some accurate, but is enough to us. - private final ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, - "connect-scheduler-check-timer", true); - - public ConnectScheduler(int maxConnections) { + public ConnectPoolMgr(int maxConnections) { this.maxConnections = maxConnections; numberConnection = new AtomicInteger(0); - nextConnectionId = new AtomicInteger(0); - checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS); - } - - private class TimeoutChecker extends TimerTask { - @Override - public void run() { - long now = System.currentTimeMillis(); - for (ConnectContext connectContext : connectionMap.values()) { - connectContext.checkTimeout(now); - } - } } - // submit one MysqlContext or ArrowFlightSqlContext to this scheduler. - // return true, if this connection has been successfully submitted, otherwise return false. - // Caller should close ConnectContext if return false. - public boolean submit(ConnectContext context) { - if (context == null) { - return false; + public void timeoutChecker(long now) { + for (ConnectContext connectContext : connectionMap.values()) { + connectContext.checkTimeout(now); } - context.setConnectionId(nextConnectionId.getAndAdd(1)); - context.resetLoginTime(); - return true; } // Register one connection with its connection id. @@ -108,9 +74,6 @@ public class ConnectScheduler { return numberConnection.get(); } connectionMap.put(ctx.getConnectionId(), ctx); - if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - flightToken2ConnectionId.put(ctx.getPeerIdentity(), ctx.getConnectionId()); - } return -1; } @@ -122,9 +85,6 @@ public class ConnectScheduler { conns.decrementAndGet(); } numberConnection.decrementAndGet(); - if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - flightToken2ConnectionId.remove(ctx.getPeerIdentity()); - } } } @@ -141,29 +101,22 @@ public class ConnectScheduler { return null; } - public ConnectContext getContext(String flightToken) { - if (flightToken2ConnectionId.containsKey(flightToken)) { - int connectionId = flightToken2ConnectionId.get(flightToken); - return getContext(connectionId); - } - return null; - } - - public void cancelQuery(String queryId, Status cancelReason) { + public boolean cancelQuery(String queryId, Status cancelReason) { for (ConnectContext ctx : connectionMap.values()) { TUniqueId qid = ctx.queryId(); if (qid != null && DebugUtil.printId(qid).equals(queryId)) { ctx.cancelQuery(cancelReason); - break; + return true; } } + return false; } public int getConnectionNum() { return numberConnection.get(); } - public List<ConnectContext.ThreadInfo> listConnection(String user, boolean isFull) { + public List<ThreadInfo> listConnection(String user, boolean isFull) { List<ConnectContext.ThreadInfo> infos = Lists.newArrayList(); for (ConnectContext ctx : connectionMap.values()) { // Check auth @@ -184,8 +137,7 @@ public class ConnectScheduler { long nowMs = System.currentTimeMillis(); for (ConnectContext ctx : connectionMap.values()) { // Check auth - if (!ctx.getCurrentUserIdentity().equals(userIdentity) && !Env.getCurrentEnv() - .getAccessManager() + if (!ctx.getCurrentUserIdentity().equals(userIdentity) && !Env.getCurrentEnv().getAccessManager() .checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 357d3e0779d..c8d27a23db6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -18,14 +18,13 @@ package org.apache.doris.qe; import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.Status; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext.ConnectType; -import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.qe.ConnectContext.ThreadInfo; +import org.apache.doris.service.arrowflight.sessions.FlightSqlConnectPoolMgr; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -45,15 +44,9 @@ import java.util.concurrent.atomic.AtomicInteger; // TODO(zhaochun): We should consider if the number of local file connection can >= maximum connections later. public class ConnectScheduler { private static final Logger LOG = LogManager.getLogger(ConnectScheduler.class); - private final int maxConnections; - private final AtomicInteger numberConnection; private final AtomicInteger nextConnectionId; - private final Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap(); - private final Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap(); - private final Map<String, Integer> flightToken2ConnectionId = Maps.newConcurrentMap(); - - // valid trace id -> query id - private final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap(); + private final ConnectPoolMgr connectPoolMgr; + private final FlightSqlConnectPoolMgr flightSqlConnectPoolMgr; // Use a thread to check whether connection is timeout. Because // 1. If use a scheduler, the task maybe a huge number when query is messy. @@ -62,24 +55,26 @@ public class ConnectScheduler { private final ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "connect-scheduler-check-timer", true); - public ConnectScheduler(int maxConnections) { - this.maxConnections = maxConnections; - numberConnection = new AtomicInteger(0); + public ConnectScheduler(int commonMaxConnections, int flightSqlMaxConnections) { nextConnectionId = new AtomicInteger(0); + this.connectPoolMgr = new ConnectPoolMgr(commonMaxConnections); + this.flightSqlConnectPoolMgr = new FlightSqlConnectPoolMgr(flightSqlMaxConnections); checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS); } - private class TimeoutChecker extends TimerTask { - @Override - public void run() { - long now = System.currentTimeMillis(); - for (ConnectContext connectContext : connectionMap.values()) { - connectContext.checkTimeout(now); - } - } + public ConnectScheduler(int commonMaxConnections) { + this(commonMaxConnections, Config.arrow_flight_max_connections); + } + + public ConnectPoolMgr getConnectPoolMgr() { + return connectPoolMgr; } - // submit one MysqlContext or ArrowFlightSqlContext to this scheduler. + public FlightSqlConnectPoolMgr getFlightSqlConnectPoolMgr() { + return flightSqlConnectPoolMgr; + } + + // submit one MysqlContext to this scheduler. // return true, if this connection has been successfully submitted, otherwise return false. // Caller should close ConnectContext if return false. public boolean submit(ConnectContext context) { @@ -91,89 +86,38 @@ public class ConnectScheduler { return true; } - // Register one connection with its connection id. - // Return -1 means register OK - // Return >=0 means register failed, and return value is current connection num. - public int registerConnection(ConnectContext ctx) { - if (numberConnection.incrementAndGet() > maxConnections) { - numberConnection.decrementAndGet(); - return numberConnection.get(); - } - // Check user - connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0)); - AtomicInteger conns = connByUser.get(ctx.getQualifiedUser()); - if (conns.incrementAndGet() > ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) { - conns.decrementAndGet(); - numberConnection.decrementAndGet(); - return numberConnection.get(); - } - connectionMap.put(ctx.getConnectionId(), ctx); - if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - flightToken2ConnectionId.put(ctx.getPeerIdentity(), ctx.getConnectionId()); - } - return -1; - } - - public void unregisterConnection(ConnectContext ctx) { - ctx.closeTxn(); - if (connectionMap.remove(ctx.getConnectionId()) != null) { - AtomicInteger conns = connByUser.get(ctx.getQualifiedUser()); - if (conns != null) { - conns.decrementAndGet(); - } - numberConnection.decrementAndGet(); - if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - flightToken2ConnectionId.remove(ctx.getPeerIdentity()); - } - } - } - public ConnectContext getContext(int connectionId) { - return connectionMap.get(connectionId); - } - - public ConnectContext getContextWithQueryId(String queryId) { - for (ConnectContext context : connectionMap.values()) { - if (queryId.equals(DebugUtil.printId(context.queryId))) { - return context; - } + ConnectContext ctx = connectPoolMgr.getContext(connectionId); + if (ctx == null) { + ctx = flightSqlConnectPoolMgr.getContext(connectionId); } - return null; + return ctx; } - public ConnectContext getContext(String flightToken) { - if (flightToken2ConnectionId.containsKey(flightToken)) { - int connectionId = flightToken2ConnectionId.get(flightToken); - return getContext(connectionId); + public ConnectContext getContextWithQueryId(String queryId) { + ConnectContext ctx = connectPoolMgr.getContextWithQueryId(queryId); + if (ctx == null) { + ctx = flightSqlConnectPoolMgr.getContextWithQueryId(queryId); } - return null; + return ctx; } - public void cancelQuery(String queryId, Status cancelReason) { - for (ConnectContext ctx : connectionMap.values()) { - TUniqueId qid = ctx.queryId(); - if (qid != null && DebugUtil.printId(qid).equals(queryId)) { - ctx.cancelQuery(cancelReason); - break; - } + public boolean cancelQuery(String queryId, Status cancelReason) { + boolean ret = connectPoolMgr.cancelQuery(queryId, cancelReason); + if (!ret) { + ret = flightSqlConnectPoolMgr.cancelQuery(queryId, cancelReason); } + return ret; } public int getConnectionNum() { - return numberConnection.get(); + return connectPoolMgr.getConnectionNum() + flightSqlConnectPoolMgr.getConnectionNum(); } - public List<ConnectContext.ThreadInfo> listConnection(String user, boolean isFull) { + public List<ThreadInfo> listConnection(String user, boolean isFull) { List<ConnectContext.ThreadInfo> infos = Lists.newArrayList(); - for (ConnectContext ctx : connectionMap.values()) { - // Check auth - if (!ctx.getQualifiedUser().equals(user) && !Env.getCurrentEnv().getAccessManager() - .checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - continue; - } - - infos.add(ctx.toThreadInfo(isFull)); - } + infos.addAll(connectPoolMgr.listConnection(user, isFull)); + infos.addAll(flightSqlConnectPoolMgr.listConnection(user, isFull)); return infos; } @@ -181,37 +125,39 @@ public class ConnectScheduler { public List<List<String>> listConnectionForRpc(UserIdentity userIdentity, boolean isShowFullSql, Optional<String> timeZone) { List<List<String>> list = new ArrayList<>(); - long nowMs = System.currentTimeMillis(); - for (ConnectContext ctx : connectionMap.values()) { - // Check auth - if (!ctx.getCurrentUserIdentity().equals(userIdentity) && !Env.getCurrentEnv() - .getAccessManager() - .checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) { - continue; - } - list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, timeZone)); - } + list.addAll(connectPoolMgr.listConnectionForRpc(userIdentity, isShowFullSql, timeZone)); + list.addAll(flightSqlConnectPoolMgr.listConnectionForRpc(userIdentity, isShowFullSql, timeZone)); return list; } - public void putTraceId2QueryId(String traceId, TUniqueId queryId) { - traceId2QueryId.put(traceId, queryId); - } - public String getQueryIdByTraceId(String traceId) { - TUniqueId queryId = traceId2QueryId.get(traceId); - return queryId == null ? "" : DebugUtil.printId(queryId); + String queryId = connectPoolMgr.getQueryIdByTraceId(traceId); + if (Strings.isNullOrEmpty(queryId)) { + queryId = flightSqlConnectPoolMgr.getQueryIdByTraceId(traceId); + } + return queryId; } public Map<Integer, ConnectContext> getConnectionMap() { - return connectionMap; + Map<Integer, ConnectContext> map = Maps.newConcurrentMap(); + map.putAll(connectPoolMgr.getConnectionMap()); + map.putAll(flightSqlConnectPoolMgr.getConnectionMap()); + return map; } public Map<String, AtomicInteger> getUserConnectionMap() { - return connByUser; + Map<String, AtomicInteger> map = Maps.newConcurrentMap(); + map.putAll(connectPoolMgr.getUserConnectionMap()); + map.putAll(flightSqlConnectPoolMgr.getUserConnectionMap()); + return map; } - public int getMaxConnections() { - return maxConnections; + private class TimeoutChecker extends TimerTask { + @Override + public void run() { + long now = System.currentTimeMillis(); + connectPoolMgr.timeoutChecker(now); + flightSqlConnectPoolMgr.timeoutChecker(now); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java index 2064bfb267f..b84fb40af1c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java @@ -39,7 +39,7 @@ public class ExecuteEnv { private ExecuteEnv() { multiLoadMgr = new MultiLoadMgr(); - scheduler = new ConnectScheduler(Config.qe_max_connection); + scheduler = new ConnectScheduler(Config.qe_max_connection, Config.arrow_flight_max_connections); startupTime = System.currentTimeMillis(); processUUID = System.currentTimeMillis(); String logDir = Strings.isNullOrEmpty(Config.sys_log_dir) ? System.getenv("LOG_DIR") : diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 154fd9f0b6b..063bc3d46c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -36,6 +36,8 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.CloseSessionRequest; +import org.apache.arrow.flight.CloseSessionResult; import org.apache.arrow.flight.Criteria; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; @@ -110,7 +112,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable this.flightSessionsManager = flightSessionsManager; sqlInfoBuilder = new SqlInfoBuilder(); sqlInfoBuilder.withFlightSqlServerName("DorisFE").withFlightSqlServerVersion("1.0") - .withFlightSqlServerArrowVersion("13.0").withFlightSqlServerReadOnly(false) + .withFlightSqlServerArrowVersion("18.2.0").withFlightSqlServerReadOnly(false) .withSqlIdentifierQuoteChar("`").withSqlDdlCatalog(true).withSqlDdlSchema(false).withSqlDdlTable(false) .withSqlIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE) .withSqlQuotedIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE); @@ -139,7 +141,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot(); listener.start(vectorSchemaRoot); listener.putNext(); - } catch (Exception e) { + } catch (Throwable e) { String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); @@ -172,7 +174,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable String executedPeerIdentity = handleParts[0]; String preparedStatementId = handleParts[1]; flightSessionsManager.getConnectContext(executedPeerIdentity).removePreparedQuery(preparedStatementId); - } catch (final Exception e) { + } catch (final Throwable e) { listener.onError(e); return; } @@ -274,7 +276,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1); } } - } catch (Exception e) { + } catch (Throwable e) { String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); @@ -288,8 +290,14 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable @Override public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, final FlightDescriptor descriptor) { - ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); - return executeQueryStatement(context.peerIdentity(), connectContext, request.getQuery(), descriptor); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + return executeQueryStatement(context.peerIdentity(), connectContext, request.getQuery(), descriptor); + } catch (Throwable e) { + String errMsg = "get flight info statement failed, " + e.getMessage(); + LOG.error(errMsg, e); + throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } } @Override @@ -402,7 +410,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable } } ackStream.onCompleted(); - } catch (Exception e) { + } catch (Throwable e) { String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e); LOG.error(errMsg, e); @@ -461,7 +469,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable listener.putNext(); listener.completed(); } - } catch (final Exception e) { + } catch (final Throwable e) { handleStreamException(e, "", listener); } } @@ -488,7 +496,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable listener.putNext(); listener.completed(); } - } catch (final Exception e) { + } catch (final Throwable e) { handleStreamException(e, "", listener); } } @@ -520,7 +528,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable listener.putNext(); listener.completed(); } - } catch (final Exception e) { + } catch (final Throwable e) { handleStreamException(e, "", listener); } } @@ -584,6 +592,25 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference unimplemented").toRuntimeException(); } + @Override + public void closeSession(CloseSessionRequest request, final CallContext context, + final StreamListener<CloseSessionResult> listener) { + // https://github.com/apache/arrow-adbc/issues/2821 + // currently FlightSqlConnection does not provide a separate interface for external calls to + // FlightSqlClient::closeSession(), nor will it automatically call closeSession + // when FlightSqlConnection::close(). Python flight sql Cursor.close() will call closeSession(). + // Neither C++ nor Java seem to have similar behavior. + try { + flightSessionsManager.closeConnectContext(context.peerIdentity()); + } catch (final Throwable e) { + LOG.error("closeSession failed", e); + listener.onError( + CallStatus.INTERNAL.withDescription("closeSession failed").withCause(e).toRuntimeException()); + } + listener.onNext(new CloseSessionResult(CloseSessionResult.Status.CLOSED)); + listener.onCompleted(); + } + private <T extends Message> FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, final Schema schema) { final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); @@ -592,7 +619,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable return new FlightInfo(schema, descriptor, endpoints, -1, -1); } - private static void handleStreamException(Exception e, String errMsg, ServerStreamListener listener) { + private static void handleStreamException(Throwable e, String errMsg, ServerStreamListener listener) { LOG.error(errMsg, e); listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java index b83936dab3b..5c47941b291 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java @@ -46,22 +46,20 @@ public class DorisFlightSqlService { public DorisFlightSqlService(int port) { BufferAllocator allocator = new RootAllocator(); - // arrow_flight_token_cache_size less than qe_max_connection to avoid `Reach limit of connections`. // arrow flight sql is a stateless protocol, connection is usually not actively disconnected. // bearer token is evict from the cache will unregister ConnectContext. this.flightTokenManager = new FlightTokenManagerImpl( - Math.min(Config.arrow_flight_token_cache_size, Config.qe_max_connection / 2), - Config.arrow_flight_token_alive_time); + Math.min(Config.arrow_flight_max_connections, Config.arrow_flight_token_cache_size), + Config.arrow_flight_token_alive_time_second); this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager); DorisFlightSqlProducer producer = new DorisFlightSqlProducer( Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port), flightSessionsManager); flightServer = FlightServer.builder(allocator, Location.forGrpcInsecure("0.0.0.0", port), producer) .headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build(); - LOG.info("Arrow Flight SQL service is created, port: {}, token_cache_size: {}" - + ", qe_max_connection: {}, token_alive_time: {}", - port, Config.arrow_flight_token_cache_size, Config.qe_max_connection, - Config.arrow_flight_token_alive_time); + LOG.info("Arrow Flight SQL service is created, port: {}, arrow_flight_max_connections: {}," + + "arrow_flight_token_alive_time_second: {}", port, Config.arrow_flight_max_connections, + Config.arrow_flight_token_alive_time_second); } // start Arrow Flight SQL service, return true if success, otherwise false diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java index 2c3a5258cc4..4e9571d63ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java @@ -45,6 +45,13 @@ public interface FlightSessionsManager { */ ConnectContext createConnectContext(String peerIdentity); + /** + * Close ConnectContext object and delete it in the local cache. + * + * @param peerIdentity identity after authorization + */ + void closeConnectContext(String peerIdentity); + static ConnectContext buildConnectContext(String peerIdentity, UserIdentity userIdentity, String remoteIP) { ConnectContext connectContext = new FlightSqlConnectContext(peerIdentity); connectContext.setEnv(Env.getCurrentEnv()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java index a495b02c393..8001998a66e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -41,7 +41,8 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager { @Override public ConnectContext getConnectContext(String peerIdentity) { try { - ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity); + ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr() + .getContextWithFlightToken(peerIdentity); if (null == connectContext) { connectContext = createConnectContext(peerIdentity); return connectContext; @@ -68,18 +69,21 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager { flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); ConnectScheduler connectScheduler = ExecuteEnv.getInstance().getScheduler(); connectScheduler.submit(connectContext); - int res = connectScheduler.registerConnection(connectContext); + int res = connectScheduler.getFlightSqlConnectPoolMgr().registerConnection(connectContext); if (res >= 0) { - long userConnLimit = connectContext.getEnv().getAuth().getMaxConn(connectContext.getQualifiedUser()); String errMsg = String.format( - "Reach limit of connections. Total: %d, User: %d, Current: %d. " - + "Increase `qe_max_connection` in fe.conf or user's `max_user_connections`," - + " or decrease `arrow_flight_token_cache_size` " - + "to evict unused bearer tokens and it connections faster", - connectScheduler.getMaxConnections(), userConnLimit, res); - connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg); + "Register arrow flight sql connection failed, Unknown Error, the number of arrow flight " + + "bearer tokens should be equal to arrow flight sql max connections, " + + "max connections: %d, used: %d.", + connectScheduler.getFlightSqlConnectPoolMgr().getMaxConnections(), res); + connectContext.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, errMsg); throw new IllegalArgumentException(errMsg); } return connectContext; } + + @Override + public void closeConnectContext(String peerIdentity) { + flightTokenManager.invalidateToken(peerIdentity); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index b90d7505923..35293273f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -24,7 +24,9 @@ import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -63,7 +65,7 @@ public class FlightSqlConnectContext extends ConnectContext { if (flightSqlChannel != null) { flightSqlChannel.close(); } - connectScheduler.unregisterConnection(this); + connectScheduler.getFlightSqlConnectPoolMgr().unregisterConnection(this); } // kill operation with no protect. @@ -80,6 +82,17 @@ public class FlightSqlConnectContext extends ConnectContext { cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user")); } + @Override + public void setQueryId(TUniqueId queryId) { + if (this.queryId != null) { + this.lastQueryId = this.queryId.deepCopy(); + } + this.queryId = queryId; + if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) { + connectScheduler.getFlightSqlConnectPoolMgr().putTraceId2QueryId(traceId, queryId); + } + } + @Override public String getRemoteHostPortString() { return getFlightSqlChannel().getRemoteHostPortString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java new file mode 100644 index 00000000000..3002116b386 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectPoolMgr.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.sessions; + +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.ConnectPoolMgr; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +public class FlightSqlConnectPoolMgr extends ConnectPoolMgr { + private static final Logger LOG = LogManager.getLogger( + FlightSqlConnectPoolMgr.class); + private final Map<String, Integer> flightToken2ConnectionId = Maps.newConcurrentMap(); + + public FlightSqlConnectPoolMgr(int maxConnections) { + super(maxConnections); + } + + // Register one connection with its connection id. + // Return -1 means register OK + // Return >=0 means register failed, and return value is current connection num. + @Override + public int registerConnection(ConnectContext ctx) { + if (numberConnection.incrementAndGet() > maxConnections) { + numberConnection.decrementAndGet(); + return numberConnection.get(); + } + // not check user + connectionMap.put(ctx.getConnectionId(), ctx); + if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { + flightToken2ConnectionId.put(ctx.getPeerIdentity(), ctx.getConnectionId()); + } + return -1; + } + + @Override + public void unregisterConnection(ConnectContext ctx) { + ctx.closeTxn(); + if (connectionMap.remove(ctx.getConnectionId()) != null) { + numberConnection.decrementAndGet(); + if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { + flightToken2ConnectionId.remove(ctx.getPeerIdentity()); + } + } + } + + public ConnectContext getContextWithFlightToken(String flightToken) { + if (flightToken2ConnectionId.containsKey(flightToken)) { + int connectionId = flightToken2ConnectionId.get(flightToken); + return getContext(connectionId); + } + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index bd3e85cd0c2..85d1a0bce35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -59,20 +59,26 @@ public class FlightTokenManagerImpl implements FlightTokenManager { private ScheduledExecutorService cleanupExecutor; public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { + // The cache size of all user tokens in Arrow Flight Server. which will be eliminated by + // LRU rules after exceeding the limit, the default value is arrow_flight_max_connections, + // arrow flight sql is a stateless protocol, the connection is usually not actively + // disconnected, bearer token is evict from the cache will unregister ConnectContext. this.cacheSize = cacheSize; this.cacheExpiration = cacheExpiration; this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize) - .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES) + .expireAfterWrite(cacheExpiration, TimeUnit.SECONDS) .removalListener(new RemovalListener<String, FlightTokenDetails>() { @Override public void onRemoval(@NotNull RemovalNotification<String, FlightTokenDetails> notification) { // TODO: broadcast this message to other FE String token = notification.getKey(); FlightTokenDetails tokenDetails = notification.getValue(); - ConnectContext context = ExecuteEnv.getInstance().getScheduler().getContext(token); + ConnectContext context = ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr() + .getContextWithFlightToken(token); if (context != null) { - ExecuteEnv.getInstance().getScheduler().unregisterConnection(context); + ExecuteEnv.getInstance().getScheduler().getFlightSqlConnectPoolMgr() + .unregisterConnection(context); LOG.info("evict bearer token: " + token + " from tokenCache, reason: " + notification.getCause() + ", and unregister flight connection context after evict bearer token"); @@ -145,13 +151,13 @@ public class FlightTokenManagerImpl implements FlightTokenManager { if (value.getToken().equals("")) { throw new IllegalArgumentException("invalid bearer token: " + token + ", try reconnect, bearer token may not be created, or may have been evict, search for this " - + "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_token_cache_size`=" - + this.cacheSize + ", `arrow_flight_token_alive_time`=" + this.cacheExpiration); + + "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_max_connections`=" + + this.cacheSize + ", `arrow_flight_token_alive_time_second`=" + this.cacheExpiration); } if (System.currentTimeMillis() >= value.getExpiresAt()) { tokenCache.invalidate(token); throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, " - + "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration); + + "currently in fe.conf, `arrow_flight_token_alive_time_second`=" + this.cacheExpiration); } if (usersTokenLRU.containsKey(value.getUsername())) { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java index 781858148af..544cf114f77 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/ConnectionExceedTest.java @@ -78,14 +78,14 @@ public class ConnectionExceedTest { context1.setEnv(mockEnv); context1.setQualifiedUser("test_user"); Assert.assertTrue(scheduler.submit(context1)); - Assert.assertEquals(-1, scheduler.registerConnection(context1)); + Assert.assertEquals(-1, scheduler.getConnectPoolMgr().registerConnection(context1)); // Create second context and register ConnectContext context2 = new ConnectContext(); context2.setEnv(mockEnv); context2.setQualifiedUser("test_user"); Assert.assertTrue(scheduler.submit(context2)); - Assert.assertEquals(-1, scheduler.registerConnection(context2)); + Assert.assertEquals(-1, scheduler.getConnectPoolMgr().registerConnection(context2)); // Create third context and try to register - should fail ConnectContext context3 = new ConnectContext(); @@ -98,7 +98,7 @@ public class ConnectionExceedTest { listener.handleConnection(context3, mockConnection); String expectedMsg = String.format( "Reach limit of connections. Total: %d, User: %d, Current: %d", - scheduler.getMaxConnections(), + scheduler.getConnectPoolMgr().getMaxConnections(), 2, // Mocked user connection limit scheduler.getConnectionNum()); Assert.assertEquals(expectedMsg, context3.getState().getErrorMessage()); @@ -108,15 +108,16 @@ public class ConnectionExceedTest { @Test public void testFlightSessionConnectionExceed() throws Exception { // Create a scheduler with small max connections - ConnectScheduler scheduler = new ConnectScheduler(2); + ConnectScheduler scheduler = new ConnectScheduler(1000, 2); // Setup expectations new Expectations() { { - mockEnv.getAuth(); - result = mockAuth; - mockAuth.getMaxConn("test_user"); - result = 2; + // Arrow flight sql not check the number of user connections. + // mockEnv.getAuth(); + // result = mockAuth; + // mockAuth.getMaxConn("test_user"); + // result = 2; mockExecuteEnv.getScheduler(); result = scheduler; @@ -140,14 +141,14 @@ public class ConnectionExceedTest { context1.setEnv(mockEnv); context1.setQualifiedUser("test_user"); Assert.assertTrue(scheduler.submit(context1)); - Assert.assertEquals(-1, scheduler.registerConnection(context1)); + Assert.assertEquals(-1, scheduler.getFlightSqlConnectPoolMgr().registerConnection(context1)); // Create second context and register ConnectContext context2 = new ConnectContext(); context2.setEnv(mockEnv); context2.setQualifiedUser("test_user"); Assert.assertTrue(scheduler.submit(context2)); - Assert.assertEquals(-1, scheduler.registerConnection(context2)); + Assert.assertEquals(-1, scheduler.getFlightSqlConnectPoolMgr().registerConnection(context2)); // Create FlightSessionsWithTokenManager and try to create a new connection FlightSessionsWithTokenManager manager = new FlightSessionsWithTokenManager(mockTokenManager); @@ -157,12 +158,10 @@ public class ConnectionExceedTest { } catch (IllegalArgumentException e) { // Verify error message is set correctly String expectedMsg = String.format( - "Reach limit of connections. Total: %d, User: %d, Current: %d. " - + "Increase `qe_max_connection` in fe.conf or user's `max_user_connections`," - + " or decrease `arrow_flight_token_cache_size` " - + "to evict unused bearer tokens and it connections faster", - scheduler.getMaxConnections(), - 2, // Mocked user connection limit + "Register arrow flight sql connection failed, Unknown Error, the number of arrow flight " + + "bearer tokens should be equal to arrow flight sql max connections, " + + "max connections: %d, used: %d.", + scheduler.getFlightSqlConnectPoolMgr().getMaxConnections(), scheduler.getConnectionNum()); Assert.assertEquals(expectedMsg, e.getMessage()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java index 6145e46bd5b..190e546dc3c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/KillConnectionCommandTest.java @@ -56,7 +56,7 @@ public class KillConnectionCommandTest extends TestWithFeService { } }; connectContext.setConnectScheduler(scheduler); - scheduler.registerConnection(connectContext); + scheduler.getConnectPoolMgr().registerConnection(connectContext); } @Test diff --git a/fe/pom.xml b/fe/pom.xml index 582d8b2d76e..14ec4bc07bd 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -321,7 +321,7 @@ under the License. you can find avro version info in iceberg mvn repository --> <iceberg.version>1.6.1</iceberg.version> <maxcompute.version>0.49.0-public</maxcompute.version> - <arrow.version>17.0.0</arrow.version> + <arrow.version>18.2.0</arrow.version> <presto.hadoop.version>2.7.4-11</presto.hadoop.version> <presto.hive.version>3.0.0-8</presto.hive.version> <!-- lakesoul --> diff --git a/samples/arrow-flight-sql/java/pom.xml b/samples/arrow-flight-sql/java/pom.xml index d08e30d69ae..f7f73cd92d5 100644 --- a/samples/arrow-flight-sql/java/pom.xml +++ b/samples/arrow-flight-sql/java/pom.xml @@ -48,8 +48,8 @@ under the License. similar issue: https://github.com/protocolbuffers/protobuf/issues/15762 3. A more stable version is Arrow 15.0.2 and ADBC 0.12.0, but we always hope to embrace the future with new versions! --> - <arrow.version>18.1.0</arrow.version> - <adbc.version>0.15.0</adbc.version> + <arrow.version>18.2.0</arrow.version> + <adbc.version>0.18.0</adbc.version> <log4j.version>2.17.1</log4j.version> </properties> <dependencies> diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index ee7df5c1862..51705a0bd35 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -253,10 +253,10 @@ GRPC_SOURCE=grpc-1.54.3 GRPC_MD5SUM="af00a2edeae0f02bb25917cc3473b7de" # arrow -ARROW_DOWNLOAD="https://github.com/apache/arrow/archive/refs/tags/apache-arrow-17.0.0.tar.gz" -ARROW_NAME="apache-arrow-17.0.0.tar.gz" -ARROW_SOURCE="arrow-apache-arrow-17.0.0" -ARROW_MD5SUM="ba18bf83e2164abd34b9ac4cb164f0f0" +ARROW_DOWNLOAD="https://github.com/apache/arrow/archive/refs/tags/apache-arrow-19.0.1.tar.gz" +ARROW_NAME="apache-arrow-19.0.1.tar.gz" +ARROW_SOURCE="arrow-apache-arrow-19.0.1" +ARROW_MD5SUM="8c5091da0f8fb41a47d7f4dad7b712df" # Abseil ABSEIL_DOWNLOAD="https://github.com/abseil/abseil-cpp/archive/refs/tags/20230125.3.tar.gz" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org