This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit a26b1eeee754b58ec66c8e055cdf89f65ad65dc8 Author: camby <camby...@tencent.com> AuthorDate: Mon Mar 4 17:26:21 2024 +0800 [fix](connections) fix connection hang after too many connections (#31594) Issue Number: close #31569 Fix fe connection hang after too high qps After fix, the third SQL will return error instead of hang: ERROR 1203 (HY000): #42000Too many connections --- .../org/apache/doris/mysql/AcceptListener.java | 107 +++++++++++---------- .../java/org/apache/doris/mysql/MysqlServer.java | 2 +- 2 files changed, 59 insertions(+), 50 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index bee00d9e696..4aa6cc85abe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -32,6 +32,7 @@ import org.xnio.StreamConnection; import org.xnio.channels.AcceptingChannel; import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; /** * listener for accept mysql connections. @@ -64,57 +65,65 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo context.setEnv(Env.getCurrentEnv()); connectScheduler.submit(context); - channel.getWorker().execute(() -> { - try { - // Set thread local info - context.setThreadLocalInfo(); - context.setConnectScheduler(connectScheduler); - // authenticate check failed. - if (!MysqlProto.negotiate(context)) { - throw new AfterConnectedException("mysql negotiate failed"); - } - if (connectScheduler.registerConnection(context)) { - MysqlProto.sendResponsePacket(context); - connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context)); - } else { - context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, - "Reach limit of connections"); - MysqlProto.sendResponsePacket(context); - throw new AfterConnectedException("Reach limit of connections"); - } - context.setStartTime(); - int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()); - if (userQueryTimeout <= 0) { - LOG.warn("Connection set query timeout to {}", - context.getSessionVariable().getQueryTimeoutS()); - } - context.setUserQueryTimeout(userQueryTimeout); - context.setUserInsertTimeout( - context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser())); - ConnectProcessor processor = new MysqlConnectProcessor(context); - context.startAcceptQuery(processor); - } catch (AfterConnectedException e) { - // do not need to print log for this kind of exception. - // just clean up the context; - context.cleanup(); - } catch (Throwable e) { - // should be unexpected exception, so print warn log - if (context.getCurrentUserIdentity() != null) { - LOG.warn("connect processor exception because ", e); - } else if (e instanceof Error) { - LOG.error("connect processor exception because ", e); - } else { - // for unauthrorized access such lvs probe request, - // may cause exception, just log it in debug level - if (LOG.isDebugEnabled()) { - LOG.debug("connect processor exception because ", e); + try { + channel.getWorker().execute(() -> { + try { + // Set thread local info + context.setThreadLocalInfo(); + context.setConnectScheduler(connectScheduler); + // authenticate check failed. + if (!MysqlProto.negotiate(context)) { + throw new AfterConnectedException("mysql negotiate failed"); + } + if (connectScheduler.registerConnection(context)) { + MysqlProto.sendResponsePacket(context); + connection.setCloseListener( + streamConnection -> connectScheduler.unregisterConnection(context)); + } else { + context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, + "Reach limit of connections"); + MysqlProto.sendResponsePacket(context); + throw new AfterConnectedException("Reach limit of connections"); } + context.setStartTime(); + int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()); + if (userQueryTimeout <= 0) { + LOG.warn("Connection set query timeout to {}", + context.getSessionVariable().getQueryTimeoutS()); + } + context.setUserQueryTimeout(userQueryTimeout); + context.setUserInsertTimeout( + context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser())); + ConnectProcessor processor = new MysqlConnectProcessor(context); + context.startAcceptQuery(processor); + } catch (AfterConnectedException e) { + // do not need to print log for this kind of exception. + // just clean up the context; + context.cleanup(); + } catch (Throwable e) { + // should be unexpected exception, so print warn log + if (context.getCurrentUserIdentity() != null) { + LOG.warn("connect processor exception because ", e); + } else if (e instanceof Error) { + LOG.error("connect processor exception because ", e); + } else { + // for unauthrorized access such lvs probe request, + // may cause exception, just log it in debug level + if (LOG.isDebugEnabled()) { + LOG.debug("connect processor exception because ", e); + } + } + context.cleanup(); + } finally { + ConnectContext.remove(); } - context.cleanup(); - } finally { - ConnectContext.remove(); - } - }); + }); + } catch (RejectedExecutionException e) { + context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, + "Too many connections"); + MysqlProto.sendResponsePacket(context); + context.cleanup(); + } } catch (IOException e) { LOG.warn("Connection accept failed.", e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java index 4fb11136132..5f70e3000b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -51,7 +51,7 @@ public class MysqlServer { private AcceptingChannel<StreamConnection> server; // default task service. - private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool( + private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true); public MysqlServer(int port, ConnectScheduler connectScheduler) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org