This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new e32932006ac [fix](connections) fix connection hang after too many connections #31761 e32932006ac is described below commit e32932006ace0f5a3f8a7deb9cc3bb0d833b0e7f Author: camby <camby...@tencent.com> AuthorDate: Wed Mar 6 20:38:14 2024 +0800 [fix](connections) fix connection hang after too many connections #31761 --- .../org/apache/doris/mysql/AcceptListener.java | 97 ++++++++++++---------- .../java/org/apache/doris/mysql/MysqlServer.java | 2 +- 2 files changed, 54 insertions(+), 45 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 1bde95c1650..30d0693a6b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -31,6 +31,7 @@ import org.xnio.StreamConnection; import org.xnio.channels.AcceptingChannel; import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; /** * listener for accept mysql connections. @@ -58,51 +59,59 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo context.setEnv(Env.getCurrentEnv()); connectScheduler.submit(context); - channel.getWorker().execute(() -> { - try { - // Set thread local info - context.setThreadLocalInfo(); - context.setConnectScheduler(connectScheduler); - // authenticate check failed. - if (!MysqlProto.negotiate(context)) { - throw new AfterConnectedException("mysql negotiate failed"); + try { + channel.getWorker().execute(() -> { + try { + // Set thread local info + context.setThreadLocalInfo(); + context.setConnectScheduler(connectScheduler); + // authenticate check failed. + if (!MysqlProto.negotiate(context)) { + throw new AfterConnectedException("mysql negotiate failed"); + } + if (connectScheduler.registerConnection(context)) { + MysqlProto.sendResponsePacket(context); + connection.setCloseListener( + streamConnection -> connectScheduler.unregisterConnection(context)); + } else { + context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, + "Reach limit of connections"); + MysqlProto.sendResponsePacket(context); + throw new AfterConnectedException("Reach limit of connections"); + } + context.setStartTime(); + context.setUserQueryTimeout( + context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser())); + context.setUserInsertTimeout( + context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser())); + ConnectProcessor processor = new ConnectProcessor(context); + context.startAcceptQuery(processor); + } catch (AfterConnectedException e) { + // do not need to print log for this kind of exception. + // just clean up the context; + context.cleanup(); + } catch (Throwable e) { + // should be unexpected exception, so print warn log + if (context.getCurrentUserIdentity() != null) { + LOG.warn("connect processor exception because ", e); + } else if (e instanceof Error) { + LOG.error("connect processor exception because ", e); + } else { + // for unauthrorized access such lvs probe request, + // may cause exception, just log it in debug level + LOG.debug("connect processor exception because ", e); + } + context.cleanup(); + } finally { + ConnectContext.remove(); } - if (connectScheduler.registerConnection(context)) { - MysqlProto.sendResponsePacket(context); - connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context)); - } else { - context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, - "Reach limit of connections"); - MysqlProto.sendResponsePacket(context); - throw new AfterConnectedException("Reach limit of connections"); - } - context.setStartTime(); - context.setUserQueryTimeout( - context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser())); - context.setUserInsertTimeout( - context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser())); - ConnectProcessor processor = new ConnectProcessor(context); - context.startAcceptQuery(processor); - } catch (AfterConnectedException e) { - // do not need to print log for this kind of exception. - // just clean up the context; - context.cleanup(); - } catch (Throwable e) { - // should be unexpected exception, so print warn log - if (context.getCurrentUserIdentity() != null) { - LOG.warn("connect processor exception because ", e); - } else if (e instanceof Error) { - LOG.error("connect processor exception because ", e); - } else { - // for unauthrorized access such lvs probe request, - // may cause exception, just log it in debug level - LOG.debug("connect processor exception because ", e); - } - context.cleanup(); - } finally { - ConnectContext.remove(); - } - }); + }); + } catch (RejectedExecutionException e) { + context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, + "Too many connections"); + MysqlProto.sendResponsePacket(context); + context.cleanup(); + } } catch (IOException e) { LOG.warn("Connection accept failed.", e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java index 4fb11136132..5f70e3000b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -51,7 +51,7 @@ public class MysqlServer { private AcceptingChannel<StreamConnection> server; // default task service. - private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool( + private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true); public MysqlServer(int port, ConnectScheduler connectScheduler) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org