This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 08cd09f4ff5 [fix](stream_load) Fix stream load IPv6 host parsing
(#64147)
08cd09f4ff5 is described below
commit 08cd09f4ff5d3a91c0438b5371426ad772f6c74f
Author: re20052 <[email protected]>
AuthorDate: Wed Jun 17 15:42:20 2026 +0800
[fix](stream_load) Fix stream load IPv6 host parsing (#64147)
### What problem does this PR solve?
Problem Summary:
In `LoadAction#splitHostAndPort`, the endpoint string (e.g. backend's
`publicEndpoint` / `privateEndpoint`, in `host:port` format) was split
by `:` to get the host and port. This does not work for IPv6 addresses
since IPv6
addresses themselves contain colons.
This PR uses Guava's `HostAndPort.fromString` to parse the endpoint
string, which correctly handles both IPv4 and IPv6 formats.
---
.../org/apache/doris/httpv2/rest/LoadAction.java | 31 ++++++----
.../apache/doris/httpv2/rest/LoadActionTest.java | 71 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 5e058fcba8f..e4b2e21c8d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -46,6 +46,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
+import com.google.common.net.HostAndPort;
import io.netty.handler.codec.http.HttpHeaderNames;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@@ -504,14 +505,11 @@ public class LoadAction extends RestBaseController {
}
String reqHost = "";
- String[] pair = reqHostStr.split(":");
- if (pair.length == 1) {
- reqHost = pair[0];
- } else if (pair.length == 2) {
- reqHost = pair[0];
- } else {
+ try {
+ reqHost = HostAndPort.fromString(reqHostStr).getHost();
+ } catch (IllegalArgumentException e) {
LOG.info("Invalid header host: {}", reqHostStr);
- throw new LoadException("Invalid header host: " + reqHost);
+ throw new LoadException("Invalid header host: " + reqHostStr);
}
// User specified redirect policy
@@ -579,19 +577,26 @@ public class LoadAction extends RestBaseController {
throw new AnalysisException("empty endpoint: " + hostPort);
}
- String[] pair = hostPort.split(":");
- if (pair.length != 2) {
+ String host;
+ int port;
+ try {
+ HostAndPort hp = HostAndPort.fromString(hostPort);
+ if (!hp.hasPort()) {
+ throw new IllegalArgumentException("No port found");
+ }
+ host = hp.getHost();
+ port = hp.getPort();
+ } catch (IllegalArgumentException e) {
LOG.info("Invalid endpoint: {}", hostPort);
throw new AnalysisException("Invalid endpoint: " + hostPort);
}
- int port = Integer.parseInt(pair[1]);
if (port <= 0 || port >= 65536) {
- LOG.info("Invalid endpoint port: {}", pair[1]);
- throw new AnalysisException("Invalid endpoint port: " + pair[1]);
+ LOG.info("Invalid endpoint port: {}", port);
+ throw new AnalysisException("Invalid endpoint port: " + port);
}
- return Pair.of(pair[0], port);
+ return Pair.of(host, port);
}
// NOTE: This function can only be used for AuditlogPlugin stream load for
now.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
index 4d57c25fd71..2c8c59d413a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
@@ -352,6 +352,61 @@ public class LoadActionTest {
redirectView.getUrl());
}
+ @Test
+ public void testSplitHostAndPortParsesIpv4() throws Exception {
+ LoadAction loadAction = new LoadAction();
+ org.apache.doris.common.Pair<String, Integer> result =
invokeSplitHostAndPort(loadAction, "10.0.0.1:8040");
+ Assertions.assertEquals("10.0.0.1", result.first);
+ Assertions.assertEquals(8040, result.second.intValue());
+ }
+
+ @Test
+ public void testSplitHostAndPortParsesIpv6() throws Exception {
+ LoadAction loadAction = new LoadAction();
+ org.apache.doris.common.Pair<String, Integer> result =
+ invokeSplitHostAndPort(loadAction, "[2001:db8::1]:8040");
+ Assertions.assertEquals("2001:db8::1", result.first);
+ Assertions.assertEquals(8040, result.second.intValue());
+ }
+
+ @Test
+ public void testSelectEndpointByRedirectPolicyParsesIpv4HostAndEndpoint()
throws Exception {
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getHeader(LoadAction.HEADER_REDIRECT_POLICY))
+ .thenReturn(LoadAction.REDIRECT_POLICY_PRIVATE);
+ Mockito.when(request.getHeader("host")).thenReturn("10.0.0.1:8030");
+
+ Backend backend = Mockito.mock(Backend.class);
+
Mockito.when(backend.getPrivateEndpoint()).thenReturn("192.168.1.1:8040");
+ Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+ Mockito.when(backend.getHost()).thenReturn("be-host");
+ Mockito.when(backend.getHttpPort()).thenReturn(8040);
+
+ TNetworkAddress addr =
invokeSelectEndpointByRedirectPolicy(loadAction, request, backend);
+ Assertions.assertEquals("192.168.1.1", addr.getHostname());
+ Assertions.assertEquals(8040, addr.getPort());
+ }
+
+ @Test
+ public void testSelectEndpointByRedirectPolicyParsesIpv6HostAndEndpoint()
throws Exception {
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getHeader(LoadAction.HEADER_REDIRECT_POLICY))
+ .thenReturn(LoadAction.REDIRECT_POLICY_PRIVATE);
+
Mockito.when(request.getHeader("host")).thenReturn("[2001:db8::1]:8030");
+
+ Backend backend = Mockito.mock(Backend.class);
+
Mockito.when(backend.getPrivateEndpoint()).thenReturn("[fd00::1]:8040");
+ Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+ Mockito.when(backend.getHost()).thenReturn("be-host");
+ Mockito.when(backend.getHttpPort()).thenReturn(8040);
+
+ TNetworkAddress addr =
invokeSelectEndpointByRedirectPolicy(loadAction, request, backend);
+ Assertions.assertEquals("fd00::1", addr.getHostname());
+ Assertions.assertEquals(8040, addr.getPort());
+ }
+
private Object invokeCreateRedirectResponse(LoadAction loadAction,
HttpServletRequest request,
HttpServletResponse response, TNetworkAddress redirectAddr,
boolean isStreamLoad, String dbName,
String tableName, String label) throws Exception {
@@ -398,6 +453,22 @@ public class LoadActionTest {
return (RedirectView) method.invoke(loadAction, request, addr,
forwardTarget);
}
+ private TNetworkAddress invokeSelectEndpointByRedirectPolicy(LoadAction
loadAction, HttpServletRequest request,
+ Backend backend) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("selectEndpointByRedirectPolicy",
+ HttpServletRequest.class, Backend.class);
+ method.setAccessible(true);
+ return (TNetworkAddress) method.invoke(loadAction, request, backend);
+ }
+
+ @SuppressWarnings("unchecked")
+ private org.apache.doris.common.Pair<String, Integer>
invokeSplitHostAndPort(LoadAction loadAction, String hostPort)
+ throws Exception {
+ Method method = LoadAction.class.getDeclaredMethod("splitHostAndPort",
String.class);
+ method.setAccessible(true);
+ return (org.apache.doris.common.Pair<String, Integer>)
method.invoke(loadAction, hostPort);
+ }
+
private HttpServletRequest mockStreamLoadRequest() throws Exception {
return mockStreamLoadRequest(-1, null, true);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]