This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0ab12b8b03821014e511ed6513e7b1ff5290527f Author: Mingyu Chen (Rayner) <[email protected]> AuthorDate: Mon Feb 17 15:40:20 2025 +0800 [opt](proxy-protocol) Support connecting to proxy protocol enabled cluster without proxy protocol header (#47776) ### What problem does this PR solve? Previously, when enabling [proxy protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) support, the client can not connect to FE directly, can only through proxy like haproxy or nginx with proxy protocol enabled. This is because for common mysql protocol, the first handshake packet is send by server side, but if proxy protocol is enabled, the client side will send a proxy protocol header message first, and the server side will parse this message, then do the following handshake. So if a client try to connect to proxy protocol enabled server without proxy protocol header message, the server side will try waiting for the header message, and the connection is blocked. This PR mainly changes: If server side enable the proxy protocol, the client can connect to the server both directly and through proxy. How to achieve that: When a new connection arrived and Doris enable the proxy protocol, Doris server with try to read one bytes from the channel, if read bytes is 0, we consider it as a "not-proxy-protocol", otherwise, try parsing the proxy protocol header message as usual. --- .../org/apache/doris/mysql/AcceptListener.java | 7 +++-- .../java/org/apache/doris/mysql/BytesChannel.java | 4 +++ .../java/org/apache/doris/mysql/MysqlChannel.java | 13 +++++++++ .../apache/doris/mysql/ProxyProtocolHandler.java | 24 +++++++++++++---- .../apache/doris/qe/ProxyProtocolHandlerTest.java | 31 +++++++++++++++++++--- 5 files changed, 69 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 3d783f28cb3..acc3fcc33e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -20,6 +20,7 @@ package org.apache.doris.mysql; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; +import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType; import org.apache.doris.mysql.ProxyProtocolHandler.ProxyProtocolResult; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; @@ -78,11 +79,13 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo if (Config.enable_proxy_protocol) { ProxyProtocolResult result = ProxyProtocolHandler.handle(context.getMysqlChannel()); Preconditions.checkNotNull(result); - if (!result.isUnknown) { + ProtocolType pType = result.pType; + if (pType == ProtocolType.PROTOCOL_WITH_IP) { context.getMysqlChannel().setRemoteAddr(result.sourceIP, result.sourcePort); } - // ignore the UNKNOWN, and just use IP from MySQL protocol. + // For PROTOCOL_WITHOUT_IP, and just use IP from MySQL protocol. // which is already set when creating MysqlChannel. + // For NOT_PROXY_PROTOCOL, just ignore to let connection with no proxy protocol in. } // authenticate check failed. diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java index bf97ae8068d..6f0aec95d4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java @@ -26,4 +26,8 @@ public interface BytesChannel { * @return number of bytes read */ public int read(ByteBuffer buffer); + + default int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) { + return read(buffer); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index aa640c57eb7..13da84e67c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -253,6 +253,19 @@ public class MysqlChannel implements BytesChannel { return readLen; } + @Override + public int testReadWithTimeout(ByteBuffer dstBuf, long timeoutMs) { + Preconditions.checkArgument(dstBuf.remaining() == 1, dstBuf.remaining()); + try { + return Channels.readBlocking(conn.getSourceChannel(), dstBuf, timeoutMs, TimeUnit.MILLISECONDS); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Read channel exception, ignore.", e); + } + return -1; + } + } + protected void decryptData(ByteBuffer dstBuf, boolean isHeader) throws SSLException { // after decrypt, we get a mysql packet with mysql header. if (!isSslMode || isHeader) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java index 0f52a05286e..0d9d1508f3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java @@ -49,12 +49,18 @@ public class ProxyProtocolHandler { private static final String TCP4 = "TCP4"; private static final String TCP6 = "TCP6"; + public enum ProtocolType { + PROTOCOL_WITH_IP, // protocol with source ip + PROTOCOL_WITHOUT_IP, // v2 protocol without source ip + NOT_PROXY_PROTOCOL // not proxy protocol + } + public static class ProxyProtocolResult { public String sourceIP = null; public int sourcePort = -1; public String destIp = null; public int destPort = -1; - public boolean isUnknown = false; + public ProtocolType pType = ProtocolType.PROTOCOL_WITH_IP; @Override public String toString() { @@ -63,7 +69,7 @@ public class ProxyProtocolHandler { + ", sourcePort=" + sourcePort + ", destIp='" + destIp + '\'' + ", destPort=" + destPort - + ", isUnknown=" + isUnknown + + ", pType=" + pType + '}'; } } @@ -71,10 +77,18 @@ public class ProxyProtocolHandler { public static ProxyProtocolResult handle(BytesChannel channel) throws IOException { // First read 1 byte to see if it is V1 or V2 ByteBuffer buffer = ByteBuffer.allocate(1); - int readLen = channel.read(buffer); - if (readLen != 1) { + int readLen = channel.testReadWithTimeout(buffer, 10); + if (readLen == -1) { + throw new IOException("Remote peer closed the channel, ignore."); + } else if (readLen == 0) { + // 0 means remote peer does not send proxy protocol content. + ProxyProtocolResult result = new ProxyProtocolResult(); + result.pType = ProtocolType.NOT_PROXY_PROTOCOL; + return result; + } else if (readLen != 1) { throw new IOException("Invalid proxy protocol, expect incoming bytes first"); } + buffer.flip(); byte firstByte = buffer.get(); if ((char) firstByte == V1_HEADER[0]) { @@ -120,7 +134,7 @@ public class ProxyProtocolHandler { throw new ProtocolException("Invalid proxy protocol v1. '\\r' is not found before '\\n'", debugInfo.toString()); } - result.isUnknown = true; + result.pType = ProtocolType.PROTOCOL_WITHOUT_IP; return result; } else if (carriageFound) { throw new ProtocolException("Invalid proxy protocol v1. " diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java index 13ad0b67d92..533c5ad9b46 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java @@ -19,23 +19,30 @@ package org.apache.doris.qe; import org.apache.doris.mysql.BytesChannel; import org.apache.doris.mysql.ProxyProtocolHandler; +import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType; import org.junit.Test; import org.junit.jupiter.api.Assertions; import java.io.IOException; +import java.nio.ByteBuffer; public class ProxyProtocolHandlerTest { public static class TestChannel implements BytesChannel { private byte[] data; private int pos; + private int testReadReturn = 1; public TestChannel(byte[] data) { this.data = data; this.pos = 0; } + public void setTestReadReturn(int testReadReturn) { + this.testReadReturn = testReadReturn; + } + @Override public int read(java.nio.ByteBuffer buffer) { int len = Math.min(buffer.remaining(), data.length - pos); @@ -45,6 +52,15 @@ public class ProxyProtocolHandlerTest { } return len; } + + @Override + public int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) { + if (testReadReturn == 1) { + return read(buffer); + } else { + return testReadReturn; + } + } } private TestChannel testChannel; @@ -55,7 +71,7 @@ public class ProxyProtocolHandlerTest { testChannel = new TestChannel(data); ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); Assertions.assertNotNull(result); - Assertions.assertFalse(result.isUnknown); + Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType); Assertions.assertEquals("192.168.0.1", result.sourceIP); Assertions.assertEquals(12345, result.sourcePort); Assertions.assertEquals("192.168.0.2", result.destIp); @@ -68,7 +84,7 @@ public class ProxyProtocolHandlerTest { testChannel = new TestChannel(data); ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); Assertions.assertNotNull(result); - Assertions.assertTrue(result.isUnknown); + Assertions.assertEquals(ProtocolType.PROTOCOL_WITHOUT_IP, result.pType); } @Test(expected = IOException.class) @@ -105,13 +121,22 @@ public class ProxyProtocolHandlerTest { testChannel = new TestChannel(data); ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); Assertions.assertNotNull(result); - Assertions.assertFalse(result.isUnknown); + Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType); Assertions.assertEquals("2001:db8:0:1:1:1:1:1", result.sourceIP); Assertions.assertEquals(12345, result.sourcePort); Assertions.assertEquals("2001:db8:0:1:1:1:1:2", result.destIp); Assertions.assertEquals(54321, result.destPort); } + @Test + public void handleNotProxyProtocol() throws IOException { + byte[] data = new byte[] {}; + testChannel = new TestChannel(data); + testChannel.setTestReadReturn(0); + ProxyProtocolHandler.ProxyProtocolResult result = ProxyProtocolHandler.handle(testChannel); + Assertions.assertEquals(ProtocolType.NOT_PROXY_PROTOCOL, result.pType); + } + @Test(expected = IOException.class) public void handleV1ProtocolWithInvalidIPv6Data() throws IOException { byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2 12345 EXTRA DATA\r\n".getBytes(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
