This is an automated email from the ASF dual-hosted git repository. cambyzju pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 74123e40bba [pick-1.2][fix](mysql) fix mysql channel infinite blocking (#32741) 74123e40bba is described below commit 74123e40bba52702bd96abde3ff347c0cf1e9d07 Author: camby <camby...@tencent.com> AuthorDate: Fri Mar 29 11:19:55 2024 +0800 [pick-1.2][fix](mysql) fix mysql channel infinite blocking (#32741) * [fix](mysql) fix mysql channel infinite blocking (#28808) Call the Channels blocking method with timeout instead. Using session variables net_write_timeout and net_read_timeout as the timeout parameter. * [conf](mysql) opt mysql network timeout to 600s #32545 --------- Co-authored-by: fornaix <foxn...@gmail.com> --- .../src/main/java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++---- .../src/main/java/org/apache/doris/qe/ConnectContext.java | 9 ++++++++- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 4 ++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index 5172b243bc2..25d23af70e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -30,6 +30,7 @@ import org.xnio.channels.Channels; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; @@ -75,11 +76,13 @@ public class MysqlChannel { protected volatile MysqlSerializer serializer; + private ConnectContext context; + protected MysqlChannel() { // For DummyMysqlChannel } - public MysqlChannel(StreamConnection connection) { + public MysqlChannel(StreamConnection connection, ConnectContext context) { Preconditions.checkNotNull(connection); this.sequenceId = 0; this.isSend = false; @@ -100,6 +103,7 @@ public class MysqlChannel { this.defaultBuffer = ByteBuffer.allocate(16 * 1024); this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN); this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024); + this.context = context; } public void initSslBuffer() { @@ -182,7 +186,8 @@ public class MysqlChannel { } try { while (dstBuf.remaining() != 0) { - int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf); + int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(), + TimeUnit.SECONDS); // return -1 when remote peer close the channel if (ret == -1) { decryptData(dstBuf, isHeader); @@ -352,12 +357,13 @@ public class MysqlChannel { protected void realNetSend(ByteBuffer buffer) throws IOException { buffer = encryptData(buffer); long bufLen = buffer.remaining(); - long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer); + long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(), + TimeUnit.SECONDS); if (bufLen != writeLen) { throw new IOException("Write mysql packet failed.[write=" + writeLen + ", needToWrite=" + bufLen + "]"); } - Channels.flushBlocking(conn.getSinkChannel()); + Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS); isSend = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index d3994c04d9b..4ebf493bdd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -215,7 +215,7 @@ public class ConnectContext { serverCapability = MysqlCapability.DEFAULT_CAPABILITY; isKilled = false; if (connection != null) { - mysqlChannel = new MysqlChannel(connection); + mysqlChannel = new MysqlChannel(connection, this); } else { mysqlChannel = new DummyMysqlChannel(); } @@ -672,5 +672,12 @@ public class ConnectContext { return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]"; } + public int getNetReadTimeout() { + return this.sessionVariable.getNetReadTimeout(); + } + + public int getNetWriteTimeout() { + return this.sessionVariable.getNetWriteTimeout(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2a9a784173f..0b8ac0ccea7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -384,11 +384,11 @@ public class SessionVariable implements Serializable, Writable { // The number of seconds to wait for a block to be written to a connection before aborting the write @VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT) - public int netWriteTimeout = 60; + public int netWriteTimeout = 600; // The number of seconds to wait for a block to be written to a connection before aborting the write @VariableMgr.VarAttr(name = NET_READ_TIMEOUT) - public int netReadTimeout = 60; + public int netReadTimeout = 600; // The current time zone @VariableMgr.VarAttr(name = TIME_ZONE, needForward = true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org