This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4ae9451b637 branch-2.1: [opt](connection) add connection num in error msg (#49471) (#49910) 4ae9451b637 is described below commit 4ae9451b63744b938ae7d3959efbf310bba85803 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Thu Apr 10 07:00:51 2025 -0700 branch-2.1: [opt](connection) add connection num in error msg (#49471) (#49910) bp #49471 --- .../java/org/apache/doris/mysql/AcceptListener.java | 11 +++++++---- .../java/org/apache/doris/qe/ConnectScheduler.java | 14 ++++++++++---- .../sessions/FlightSessionsWithTokenManager.java | 20 ++++++++++++++------ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 3d783f28cb3..0388a532efd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -89,15 +89,18 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo if (!MysqlProto.negotiate(context)) { throw new AfterConnectedException("mysql negotiate failed"); } - if (connectScheduler.registerConnection(context)) { + int res = connectScheduler.registerConnection(context); + if (res == -1) { MysqlProto.sendResponsePacket(context); connection.setCloseListener( streamConnection -> connectScheduler.unregisterConnection(context)); } else { - context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, - "Reach limit of connections"); + long userConnLimit = context.getEnv().getAuth().getMaxConn(context.getQualifiedUser()); + String errMsg = String.format("Reach limit of connections. Total: %, User: %d, Current: %d", + connectScheduler.getMaxConnections(), userConnLimit, res); + context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg); MysqlProto.sendResponsePacket(context); - throw new AfterConnectedException("Reach limit of connections"); + throw new AfterConnectedException(errMsg); } context.setStartTime(); int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index e4626a0d215..1618ca2dd43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -90,10 +90,12 @@ public class ConnectScheduler { } // Register one connection with its connection id. - public boolean registerConnection(ConnectContext ctx) { + // Return -1 means register OK + // Return >=0 means register failed, and return value is current connection num. + public int registerConnection(ConnectContext ctx) { if (numberConnection.incrementAndGet() > maxConnections) { numberConnection.decrementAndGet(); - return false; + return numberConnection.get(); } // Check user connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0)); @@ -101,13 +103,13 @@ public class ConnectScheduler { if (conns.incrementAndGet() > ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) { conns.decrementAndGet(); numberConnection.decrementAndGet(); - return false; + return numberConnection.get(); } connectionMap.put(ctx.getConnectionId(), ctx); if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { flightToken2ConnectionId.put(ctx.getPeerIdentity(), ctx.getConnectionId()); } - return true; + return -1; } public void unregisterConnection(ConnectContext ctx) { @@ -202,4 +204,8 @@ public class ConnectScheduler { public Map<Integer, ConnectContext> getConnectionMap() { return connectionMap; } + + public int getMaxConnections() { + return maxConnections; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java index 75d7ff1b334..a495b02c393 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -20,6 +20,7 @@ package org.apache.doris.service.arrowflight.sessions; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.util.Util; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ConnectScheduler; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.arrowflight.tokens.FlightTokenDetails; import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; @@ -65,12 +66,19 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager { flightTokenDetails.setCreatedSession(true); ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity, flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); - ExecuteEnv.getInstance().getScheduler().submit(connectContext); - if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) { - String err = "Reach limit of connections, increase `qe_max_connection` in fe.conf, or decrease " - + "`arrow_flight_token_cache_size` to evict unused bearer tokens and it connections faster"; - connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, err); - throw new IllegalArgumentException(err); + ConnectScheduler connectScheduler = ExecuteEnv.getInstance().getScheduler(); + connectScheduler.submit(connectContext); + int res = connectScheduler.registerConnection(connectContext); + if (res >= 0) { + long userConnLimit = connectContext.getEnv().getAuth().getMaxConn(connectContext.getQualifiedUser()); + String errMsg = String.format( + "Reach limit of connections. Total: %d, User: %d, Current: %d. " + + "Increase `qe_max_connection` in fe.conf or user's `max_user_connections`," + + " or decrease `arrow_flight_token_cache_size` " + + "to evict unused bearer tokens and it connections faster", + connectScheduler.getMaxConnections(), userConnLimit, res); + connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg); + throw new IllegalArgumentException(errMsg); } return connectContext; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org