This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 36b45b570b0 [fix](mysql) fix mysql channel infinite blocking (#28808) 36b45b570b0 is described below commit 36b45b570b0c16ac0a4cbe8f05e6a7ea0b946851 Author: fornaix <foxn...@gmail.com> AuthorDate: Fri Dec 29 13:57:22 2023 +0800 [fix](mysql) fix mysql channel infinite blocking (#28808) Call the Channels blocking method with timeout instead. Using session variables net_write_timeout and net_read_timeout as the timeout parameter. --- .../src/main/java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++---- .../src/main/java/org/apache/doris/qe/ConnectContext.java | 10 +++++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index 5eaee47fa4b..8e7c5f79ffd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -31,6 +31,7 @@ import org.xnio.channels.Channels; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; @@ -79,6 +80,8 @@ public class MysqlChannel { // mysql flag CLIENT_DEPRECATE_EOF private boolean clientDeprecatedEOF; + private ConnectContext context; + protected MysqlChannel() { // For DummyMysqlChannel } @@ -91,7 +94,7 @@ public class MysqlChannel { return clientDeprecatedEOF; } - public MysqlChannel(StreamConnection connection) { + public MysqlChannel(StreamConnection connection, ConnectContext context) { Preconditions.checkNotNull(connection); this.sequenceId = 0; this.isSend = false; @@ -113,6 +116,7 @@ public class MysqlChannel { this.defaultBuffer = ByteBuffer.allocate(16 * 1024); this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN); this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024); + this.context = context; } public void initSslBuffer() { @@ -195,7 +199,8 @@ public class MysqlChannel { } try { while (dstBuf.remaining() != 0) { - int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf); + int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(), + TimeUnit.SECONDS); // return -1 when remote peer close the channel if (ret == -1) { decryptData(dstBuf, isHeader); @@ -365,12 +370,13 @@ public class MysqlChannel { protected void realNetSend(ByteBuffer buffer) throws IOException { buffer = encryptData(buffer); long bufLen = buffer.remaining(); - long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer); + long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(), + TimeUnit.SECONDS); if (bufLen != writeLen) { throw new IOException("Write mysql packet failed.[write=" + writeLen + ", needToWrite=" + bufLen + "]"); } - Channels.flushBlocking(conn.getSinkChannel()); + Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS); isSend = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index a3590a23c4b..8a1a16999b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -327,7 +327,7 @@ public class ConnectContext { connectType = ConnectType.MYSQL; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; if (connection != null) { - mysqlChannel = new MysqlChannel(connection); + mysqlChannel = new MysqlChannel(connection, this); } else { mysqlChannel = new DummyMysqlChannel(); } @@ -1033,5 +1033,13 @@ public class ConnectContext { public void setSkipAuth(boolean skipAuth) { this.skipAuth = skipAuth; } + + public int getNetReadTimeout() { + return this.sessionVariable.getNetReadTimeout(); + } + + public int getNetWriteTimeout() { + return this.sessionVariable.getNetWriteTimeout(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org