This is an automated email from the ASF dual-hosted git repository.

cambyzju pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 74123e40bba [pick-1.2][fix](mysql) fix mysql channel infinite blocking 
(#32741)
74123e40bba is described below

commit 74123e40bba52702bd96abde3ff347c0cf1e9d07
Author: camby <camby...@tencent.com>
AuthorDate: Fri Mar 29 11:19:55 2024 +0800

    [pick-1.2][fix](mysql) fix mysql channel infinite blocking (#32741)
    
    * [fix](mysql) fix mysql channel infinite blocking (#28808)
    
    Call the Channels blocking method with timeout instead.
    
    Using session variables net_write_timeout and net_read_timeout as the 
timeout parameter.
    
    * [conf](mysql) opt mysql network timeout to 600s #32545
    
    ---------
    
    Co-authored-by: fornaix <foxn...@gmail.com>
---
 .../src/main/java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++----
 .../src/main/java/org/apache/doris/qe/ConnectContext.java  |  9 ++++++++-
 .../src/main/java/org/apache/doris/qe/SessionVariable.java |  4 ++--
 3 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index 5172b243bc2..25d23af70e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -30,6 +30,7 @@ import org.xnio.channels.Channels;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
@@ -75,11 +76,13 @@ public class MysqlChannel {
 
     protected volatile MysqlSerializer serializer;
 
+    private ConnectContext context;
+
     protected MysqlChannel() {
         // For DummyMysqlChannel
     }
 
-    public MysqlChannel(StreamConnection connection) {
+    public MysqlChannel(StreamConnection connection, ConnectContext context) {
         Preconditions.checkNotNull(connection);
         this.sequenceId = 0;
         this.isSend = false;
@@ -100,6 +103,7 @@ public class MysqlChannel {
         this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
         this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
         this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
+        this.context = context;
     }
 
     public void initSslBuffer() {
@@ -182,7 +186,8 @@ public class MysqlChannel {
         }
         try {
             while (dstBuf.remaining() != 0) {
-                int ret = Channels.readBlocking(conn.getSourceChannel(), 
dstBuf);
+                int ret = Channels.readBlocking(conn.getSourceChannel(), 
dstBuf, context.getNetReadTimeout(),
+                        TimeUnit.SECONDS);
                 // return -1 when remote peer close the channel
                 if (ret == -1) {
                     decryptData(dstBuf, isHeader);
@@ -352,12 +357,13 @@ public class MysqlChannel {
     protected void realNetSend(ByteBuffer buffer) throws IOException {
         buffer = encryptData(buffer);
         long bufLen = buffer.remaining();
-        long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
+        long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, 
context.getNetWriteTimeout(),
+                TimeUnit.SECONDS);
         if (bufLen != writeLen) {
             throw new IOException("Write mysql packet failed.[write=" + 
writeLen
                     + ", needToWrite=" + bufLen + "]");
         }
-        Channels.flushBlocking(conn.getSinkChannel());
+        Channels.flushBlocking(conn.getSinkChannel(), 
context.getNetWriteTimeout(), TimeUnit.SECONDS);
         isSend = true;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index d3994c04d9b..4ebf493bdd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -215,7 +215,7 @@ public class ConnectContext {
         serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
         isKilled = false;
         if (connection != null) {
-            mysqlChannel = new MysqlChannel(connection);
+            mysqlChannel = new MysqlChannel(connection, this);
         } else {
             mysqlChannel = new DummyMysqlChannel();
         }
@@ -672,5 +672,12 @@ public class ConnectContext {
         return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
     }
 
+    public int getNetReadTimeout() {
+        return this.sessionVariable.getNetReadTimeout();
+    }
+
+    public int getNetWriteTimeout() {
+        return this.sessionVariable.getNetWriteTimeout();
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2a9a784173f..0b8ac0ccea7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -384,11 +384,11 @@ public class SessionVariable implements Serializable, 
Writable {
 
     // The number of seconds to wait for a block to be written to a connection 
before aborting the write
     @VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT)
-    public int netWriteTimeout = 60;
+    public int netWriteTimeout = 600;
 
     // The number of seconds to wait for a block to be written to a connection 
before aborting the write
     @VariableMgr.VarAttr(name = NET_READ_TIMEOUT)
-    public int netReadTimeout = 60;
+    public int netReadTimeout = 600;
 
     // The current time zone
     @VariableMgr.VarAttr(name = TIME_ZONE, needForward = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to