This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9c40fa5fd [fix](fe) Fix broken pipe risk on stream load redirect 
with unconsumed request body (#63332)
3c9c40fa5fd is described below

commit 3c9c40fa5fd3d19ec70804afafd911992967b568
Author: Wen Zhenghu <[email protected]>
AuthorDate: Sat May 30 08:28:03 2026 +0800

    [fix](fe) Fix broken pipe risk on stream load redirect with unconsumed 
request body (#63332)
    
    ### What problem does this PR solve?
    
    Issue Number: close #63325
    
    Problem Summary:
    
    **Problem**
    - Starting from Doris `3.1.3`, FE uses `Jetty 12`, and this introduced a
    compatibility change in the Stream Load redirect path.
    - When a Stream Load request is sent to FE, FE may return `307 Temporary
    Redirect` before the request body is fully consumed. Under `Jetty 12`,
    this behavior is more likely to cause early connection close or reset
    while the client is still writing the request body.
    - As a result, some `HTTP/1.1` streaming clients may observe errors such
    as `BrokenPipeError` or `ConnectionResetError` when sending Stream Load
    requests through FE.
    - The problem is more visible with chunked uploads, higher network
    latency, and clients that continue sending request body data before
    fully processing the redirect response.
    - In short, this is a compatibility regression introduced by the `Jetty
    12` upgrade in Doris `3.1.3` and later.
    
    **Fix**
    - We keep the existing FE-to-BE redirect architecture unchanged, so FE
    still redirects Stream Load requests to BE instead of proxying the full
    request body.
    - We add a bounded request-body drain step on the FE Stream Load
    redirect path:
      - FE first writes the `307 Temporary Redirect` response.
    - FE then drains and discards only a bounded amount of the remaining
    request body.
    - This provides a small compatibility window for in-flight client writes
    and reduces the chance of early connection reset.
    - We also apply the same handling to token-authenticated Stream Load
    requests, so both password-authenticated and token-authenticated paths
    behave consistently.
    - In addition, we expose Jetty's unconsumed request content read setting
    through FE configuration and apply it to HTTP connectors, so operators
    can tune Jetty behavior for redirect scenarios where the request body is
    not fully consumed.
    - To make the compatibility path effective out of the box, this PR also
    enables the bounded drain path by default with a `1GB` drain limit and a
    `1000ms` idle wait window.
    
    **New Configurations**
    - `jetty_server_max_unconsumed_request_content_reads`
    - Controls how many extra reads Jetty performs for unconsumed request
    content.
    - `-1` means unlimited, `0` disables extra reads, and a positive value
    sets the maximum number of read attempts.
      - Default value in this PR: `-1`.
    - This helps tune Jetty behavior after the `Jetty 12` upgrade when FE
    returns a response before the request body is fully consumed.
    
    - `stream_load_redirect_bounded_drain_max_bytes`
    - Controls the maximum number of request body bytes FE drains after
    returning `307` for a Stream Load redirect.
      - `0` disables this compatibility logic.
    - A positive value enables bounded draining and limits how much data FE
    will discard.
      - Default value in this PR: `1GB`.
    
    - `stream_load_redirect_bounded_drain_max_idle_time_ms`
    - Controls how long FE waits for more readable request body data during
    the bounded drain process.
      - `0` disables the extra idle wait.
    - A positive value provides a small grace window for slow clients or
    delayed body chunks, helping absorb in-flight writes without keeping the
    connection open indefinitely.
      - Default value in this PR: `1000ms`.
    
    **Test Result / Validation**
    - Verified the behavior with the same Python `HTTP/1.1` chunked Stream
    Load reproduction used during issue analysis.
    - Reproduced requests were sent to FE with `Expect: 100-continue`,
    `Transfer-Encoding: chunked`, and paced body streaming to maximize the
    redirect race window.
    - Baseline validation on Doris `3.0` (`9030` / FE `8030`):
      - `payload_mb=1`, `chunk_kb=1`, `sleep_ms=0`
      - `payload_mb=8`, `chunk_kb=16`, `sleep_ms=10`
      - Both requests returned normal `307 Temporary Redirect`.
    - Validation on the fixed Doris `3.1.4` instance (`9034` / FE `8034`):
    - Before enabling the bounded drain config, the same reproduction still
    triggered `BrokenPipeError`.
      - After enabling the FE configs below:
        - `jetty_server_max_unconsumed_request_content_reads = -1`
        - `stream_load_redirect_bounded_drain_max_bytes = 16777216`
        - `stream_load_redirect_bounded_drain_max_idle_time_ms = 1000`
    - The same two reproduction requests both returned normal `307 Temporary
    Redirect`.
    - No `BrokenPipeError` or `ConnectionResetError` was observed after the
    config took effect.
    - The PR now further updates the default bounded drain byte limit from
    `16MB` to `1GB`, while keeping the default idle wait at `1000ms`, so the
    compatibility path is enabled by default with a more generous drain
    window.
    
    **Performance Validation**
    - I compared FE redirect response time between the Doris `3.0` baseline
    instance (`9030`) and the fixed Doris `3.1.4` instance (`9034`).
    - The goal was to check whether the additional bounded drain logic on FE
    introduces a noticeable regression compared with the original Jetty 9
    behavior.
    
    **Test Setup**
    - Reproduction tool: `tools/stream_load_redirect_repro.py`
    - Target: FE endpoint on both instances
    - Client mode: `httpclient`
    - Common parameters:
      - `chunk_kb = 16`
      - `sleep_ms = 0`
    - Payload sizes:
      - `32MB`
      - `128MB`
      - `512MB`
    - Each case was executed `3` times on each instance, and the average
    `elapsed_seconds` was used for comparison.
    
    **Results**
    - `32MB`
      - `9030`: `9.515s / 12.032s / 9.727s`
      - Average: `10.425s`
      - `9034`: `10.695s / 10.847s / 8.763s`
      - Average: `10.102s`
      - Difference: `9034` was `0.323s` faster, about `3.1%`
    
    - `128MB`
      - `9030`: `37.174s / 34.111s / 37.090s`
      - Average: `36.125s`
      - `9034`: `38.910s / 36.423s / 38.337s`
      - Average: `37.890s`
      - Difference: `9034` was `1.765s` slower, about `4.9%`
    
    - `512MB`
      - `9030`: `157.181s / 161.148s / 174.421s`
      - Average: `164.250s`
      - `9034`: `172.310s / 176.692s / 160.068s`
      - Average: `169.690s`
      - Difference: `9034` was `5.440s` slower, about `3.3%`
    
    **Conclusion**
    - Across all tested payload sizes, the difference between `9030` and
    `9034` stayed within a small range, roughly `-3%` to `+5%`.
    - Based on these measurements, the FE bounded drain logic does not show
    a significant performance regression compared with the baseline FE
    redirect behavior.
    - In other words, the fix improves redirect compatibility while keeping
    FE redirect response time at a similar level in normal request sizes.
    ---------
    
    Co-authored-by: yaoxiao <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  24 ++
 .../config/WebServerFactoryCustomizerConfig.java   |   3 +
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 152 ++++---
 .../doris/httpv2/rest/RestBaseController.java      |  51 ++-
 .../httpv2/util/StreamLoadRedirectDrainUtil.java   | 154 +++++++
 .../apache/doris/httpv2/rest/LoadActionTest.java   | 445 ++++++++++++++++++++-
 .../doris/httpv2/rest/RestBaseControllerTest.java  |  64 +++
 .../util/StreamLoadRedirectDrainUtilTest.java      | 394 ++++++++++++++++++
 .../scripts/stream_load_redirect_chunked_e2e.py    | 140 +++++++
 ...test_stream_load_fe_redirect_chunked_e2e.groovy | 117 ++++++
 10 files changed, 1459 insertions(+), 85 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bdd64284c9a..121503e56c4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -380,6 +380,13 @@ public class Config extends ConfigBase {
     @ConfField(description = {"The maximum HTTP POST size of Jetty, in bytes, 
the default value is 100MB."})
     public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;
 
+    @ConfField(description = {
+            "Jetty 在应用未消费完请求体时,额外尝试读取剩余内容的最大次数。"
+                    + "-1 表示不限制,0 表示不额外读取,正数表示最大读取次数。",
+            "The maximum number of extra reads Jetty performs for unconsumed 
request content. "
+                    + "-1 means unlimited, 0 means disabled, and a positive 
value limits the read attempts."})
+    public static int jetty_server_max_unconsumed_request_content_reads = -1;
+
     @ConfField(description = {"The maximum HTTP header size of Jetty, in 
bytes, the default value is 1MB."})
     public static int jetty_server_max_http_header_size = 1048576;
 
@@ -3328,6 +3335,23 @@ public class Config extends ConfigBase {
             + "public-private/public/private/direct/random-be and empty 
string."})
     public static String streamload_redirect_policy = "";
 
+    @ConfField(mutable = true, description = {
+            "Stream Load redirect 场景下,FE 在返回 307 后额外丢弃请求体的最大字节数。"
+                    + "0 表示关闭该兼容逻辑,正数表示最大丢弃字节数。",
+            "The maximum number of request body bytes FE drains after 
returning 307 for Stream Load redirects. "
+                    + "0 disables the compatibility logic, and a positive 
value sets the byte limit."})
+    // Enable a generous bounded drain window by default to preserve FE 
redirect compatibility on Jetty 12.
+    public static long stream_load_redirect_bounded_drain_max_bytes = 1024L * 
1024 * 1024;
+
+    @ConfField(mutable = true, description = {
+            "Stream Load redirect 场景下,FE 在检测到请求体暂时无可读数据后继续等待的最大空闲时长,单位毫秒。"
+                    + "0 表示不额外等待,用于给慢客户端或分段到达的数据保留一个有限的缓冲窗口。",
+            "The maximum idle wait time in milliseconds after FE detects no 
readable request body bytes "
+                    + "during Stream Load redirect drain. 0 disables the extra 
idle wait, while a positive value "
+                    + "keeps a bounded grace window for slow clients or 
delayed request body chunks."})
+    // Keep a small grace period for delayed body chunks after FE has already 
written the redirect.
+    public static int stream_load_redirect_bounded_drain_max_idle_time_ms = 
1000;
+
     @ConfField(mutable = true, description = {
             "Whether to enable group commit streamload BE forward feature in 
cloud mode. "
                     + "Solves the issue where LB random forwarding breaks 
group commit batching "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
index ead380b260e..e37e60e849e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
@@ -50,6 +50,9 @@ public class WebServerFactoryCustomizerConfig implements 
WebServerFactoryCustomi
                     if (httpFactory != null) {
                         HttpConfiguration httpConfig = 
httpFactory.getHttpConfiguration();
                         
httpConfig.setRequestHeaderSize(Config.jetty_server_max_http_header_size);
+                        // Apply the unconsumed request content read limit to 
every HTTP connector.
+                        httpConfig.setMaxUnconsumedRequestContentReads(
+                                
Config.jetty_server_max_unconsumed_request_content_reads);
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index bea379f4f28..5e058fcba8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.util.StreamLoadRedirectDrainUtil;
 import org.apache.doris.load.StreamLoadHandler;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.GroupCommitPlanner;
@@ -59,8 +60,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
+import java.io.IOException;
 import java.net.InetAddress;
-import java.net.URI;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Optional;
@@ -131,7 +132,7 @@ public class LoadAction extends RestBaseController {
             if (!checkClusterToken(authToken)) {
                 throw new UnauthorizedException("Invalid token: " + authToken);
             }
-            return executeWithClusterToken(request, db, table, true);
+            return executeWithClusterToken(request, response, db, table, true);
         } else {
             try {
                 executeCheckPassword(request, response);
@@ -190,8 +191,7 @@ public class LoadAction extends RestBaseController {
             LOG.info("redirect load action to destination={}, label: {}",
                     redirectAddr.toString(), label);
 
-            RedirectView redirectView = redirectTo(request, redirectAddr);
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
true, null, null, label);
         } catch (Exception e) {
             return new RestBaseResult(e.getMessage());
         }
@@ -311,11 +311,17 @@ public class LoadAction extends RestBaseController {
                         redirectAddr.toString(), isStreamLoad, dbName, 
tableName, label);
             }
 
-            RedirectView redirectView = redirectTo(request, redirectAddr);
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
         } catch (StreamLoadForwardException e) {
-            // Special handling for stream load forwarding
-            return e.getRedirectView();
+            // Handle IOException from redirect response generation in the 
forwarding path.
+            try {
+                return createRedirectResponse(request, response, 
e.getRedirectView(),
+                        isStreamLoad, db, table, label);
+            } catch (IOException ioException) {
+                LOG.warn("stream load forward redirect failed, stream: {}, db: 
{}, tbl: {}, label: {}, err: {}",
+                        isStreamLoad, db, table, label, 
ioException.getMessage());
+                return new RestBaseResult(ioException.getMessage());
+            }
         } catch (Exception e) {
             LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, 
err: {}",
                     isStreamLoad, db, table, label, e.getMessage());
@@ -592,7 +598,7 @@ public class LoadAction extends RestBaseController {
     // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
     // temporarily addressing the users' needs for audit logs.
     // So this function is not widely tested under general scenario
-    private Object executeWithClusterToken(HttpServletRequest request, String 
db,
+    private Object executeWithClusterToken(HttpServletRequest request, 
HttpServletResponse response, String db,
             String table, boolean isStreamLoad) {
         try {
             ConnectContext ctx = new ConnectContext();
@@ -635,29 +641,7 @@ public class LoadAction extends RestBaseController {
                             + "stream: {}, db: {}, tbl: {}, label: {}",
                     redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
 
-            URI urlObj = null;
-            URI resultUriObj = null;
-            String urlStr = request.getRequestURI();
-            String userInfo = null;
-
-            try {
-                urlObj = new URI(urlStr);
-                resultUriObj = new URI("http", userInfo, 
redirectAddr.getHostname(),
-                        redirectAddr.getPort(), urlObj.getPath(), "", null);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            String redirectUrl = resultUriObj.toASCIIString();
-            if (!Strings.isNullOrEmpty(request.getQueryString())) {
-                redirectUrl += request.getQueryString();
-            }
-            LOG.info("Redirect url: {}", "http://"; + 
redirectAddr.getHostname() + ":"
-                    + redirectAddr.getPort() + urlObj.getPath());
-            RedirectView redirectView = new RedirectView(redirectUrl);
-            redirectView.setContentType("text/html;charset=utf-8");
-            
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
         } catch (Exception e) {
             LOG.warn("Failed to execute stream load with cluster token, {}", 
e.getMessage(), e);
             return new RestBaseResult(e.getMessage());
@@ -677,6 +661,80 @@ public class LoadAction extends RestBaseController {
         return headers.toString();
     }
 
+    private Object createRedirectResponse(HttpServletRequest request, 
HttpServletResponse response,
+            TNetworkAddress redirectAddr, boolean isStreamLoad, String dbName, 
String tableName, String label)
+            throws IOException {
+        String redirectUrl = buildRedirectUrl(request, redirectAddr);
+        if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+            return redirectTo(request, redirectAddr);
+        }
+        writeTemporaryRedirect(response, redirectUrl);
+        DrainDecision drainDecision = 
decideDrainDecisionForStreamLoadRedirect(request);
+        if (drainDecision != DrainDecision.DRAIN) {
+            LOG.info("skip bounded drain after stream load redirect, target: 
{}, db: {}, tbl: {}, label: {},"
+                            + " reason: {}",
+                    redirectAddr, dbName, tableName, label, drainDecision);
+            return null;
+        }
+        drainStreamLoadRequestBodyAfterRedirect(request, 
redirectAddr.toString(), dbName, tableName, label);
+        return null;
+    }
+
+    private Object createRedirectResponse(HttpServletRequest request, 
HttpServletResponse response,
+            RedirectView redirectView, boolean isStreamLoad, String dbName, 
String tableName, String label)
+            throws IOException {
+        if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+            return redirectView;
+        }
+        writeTemporaryRedirect(response, redirectView.getUrl());
+        DrainDecision drainDecision = 
decideDrainDecisionForStreamLoadRedirect(request);
+        if (drainDecision != DrainDecision.DRAIN) {
+            LOG.info("skip bounded drain after stream load redirect, target: 
{}, db: {}, tbl: {}, label: {},"
+                            + " reason: {}",
+                    redirectView.getUrl(), dbName, tableName, label, 
drainDecision);
+            return null;
+        }
+        drainStreamLoadRequestBodyAfterRedirect(request, 
redirectView.getUrl(), dbName, tableName, label);
+        return null;
+    }
+
+    private boolean shouldUseBoundedDrainForStreamLoad(boolean isStreamLoad) {
+        return isStreamLoad && 
Config.stream_load_redirect_bounded_drain_max_bytes > 0;
+    }
+
+    // Skip the bounded drain for header-only probes and oversized 
fixed-length bodies.
+    private DrainDecision 
decideDrainDecisionForStreamLoadRedirect(HttpServletRequest request) {
+        long contentLength = request.getContentLengthLong();
+        String transferEncoding = 
request.getHeader(HttpHeaderNames.TRANSFER_ENCODING.toString());
+        if (contentLength <= 0 && Strings.isNullOrEmpty(transferEncoding)) {
+            return DrainDecision.SKIP_NO_REQUEST_BODY;
+        }
+        if (contentLength > 
Config.stream_load_redirect_bounded_drain_max_bytes) {
+            return DrainDecision.SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES;
+        }
+        return DrainDecision.DRAIN;
+    }
+
+    private void drainStreamLoadRequestBodyAfterRedirect(HttpServletRequest 
request, String redirectTarget,
+            String dbName, String tableName, String label) {
+        long drainLimit = Config.stream_load_redirect_bounded_drain_max_bytes;
+        LOG.info("write stream load redirect and start bounded drain, target: 
{}, db: {}, tbl: {}, label: {},"
+                        + " max_drain_bytes: {}",
+                redirectTarget, dbName, tableName, label, drainLimit);
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, drainLimit);
+        LOG.info("finish bounded drain after stream load redirect, target: {}, 
db: {}, tbl: {}, label: {},"
+                        + " drained_bytes: {}, elapsed_ms: {}, exit_reason: 
{}",
+                redirectTarget, dbName, tableName, label, 
drainResult.getDrainedBytes(),
+                drainResult.getElapsedMillis(), drainResult.getExitReason());
+    }
+
+    private enum DrainDecision {
+        SKIP_NO_REQUEST_BODY,
+        SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES,
+        DRAIN
+    }
+
     private boolean isSensitiveHeader(String headerName) {
         return "Authorization".equalsIgnoreCase(headerName)
                 || "Proxy-Authorization".equalsIgnoreCase(headerName)
@@ -738,34 +796,14 @@ public class LoadAction extends RestBaseController {
      */
     private RedirectView redirectToStreamLoadForward(HttpServletRequest 
request, TNetworkAddress addr,
             String forwardTarget) {
-        URI urlObj = null;
-        URI resultUriObj = null;
-        String urlStr = request.getRequestURI();
-        String userInfo = null;
-        String modifiedPath = null;
-
-        if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
-            ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
-            userInfo = authInfo.fullUserName + ":" + authInfo.password;
-        }
-        try {
-            urlObj = new URI(urlStr);
-            // Replace _stream_load with _stream_load_forward in the path
-            modifiedPath = urlObj.getPath().replace("/_stream_load", 
"/_stream_load_forward");
-            resultUriObj = new URI("http", userInfo, addr.getHostname(),
-                    addr.getPort(), modifiedPath, "", null);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        String redirectUrl = resultUriObj.toASCIIString();
-
-        // Add forward_to parameter (note: toASCIIString() already includes 
'?' due to empty query)
+        // Replace _stream_load with _stream_load_forward in the path.
+        String modifiedPath = request.getRequestURI().replace("/_stream_load", 
"/_stream_load_forward");
         String queryString = request.getQueryString();
+        String redirectQuery = "forward_to=" + forwardTarget;
         if (!Strings.isNullOrEmpty(queryString)) {
-            redirectUrl += queryString + "&forward_to=" + forwardTarget;
-        } else {
-            redirectUrl += "forward_to=" + forwardTarget;
+            redirectQuery = queryString + "&" + redirectQuery;
         }
+        String redirectUrl = buildRedirectUrl(request, addr, modifiedPath, 
redirectQuery);
 
         LOG.info("Redirect stream load forward url: {}, forward_to: {}",
                 "http://"; + addr.getHostname() + ":" + addr.getPort() + 
modifiedPath, forwardTarget);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index 51378a9dc1d..5d55dab3cf2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -88,36 +88,49 @@ public class RestBaseController extends BaseController {
         return authInfo;
     }
 
-    public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress 
addr) {
-        RedirectView redirectView = new RedirectView(getRedirectUrL(request, 
addr));
-        redirectView.setContentType("text/html;charset=utf-8");
-        
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-        return redirectView;
+    protected String buildRedirectUrl(HttpServletRequest request, 
TNetworkAddress addr) {
+        return buildRedirectUrl(request, addr, request.getRequestURI(), 
request.getQueryString());
     }
 
-    public String getRedirectUrL(HttpServletRequest request, TNetworkAddress 
addr) {
-        URI urlObj = null;
-        URI resultUriObj = null;
-        String urlStr = request.getRequestURI();
+    protected String buildRedirectUrl(HttpServletRequest request, 
TNetworkAddress addr, String requestPath,
+            String queryString) {
         String userInfo = null;
         if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
             ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
             userInfo = authInfo.fullUserName + ":" + authInfo.password;
         }
         try {
-            urlObj = new URI(urlStr);
-            resultUriObj = new URI(request.getScheme(), userInfo, 
addr.getHostname(),
-                    addr.getPort(), urlObj.getPath(), "", null);
+            // Preserve the original request path to avoid re-encoding an 
already encoded URI path.
+            URI authorityUri = new URI(request.getScheme(), userInfo, 
addr.getHostname(),
+                    addr.getPort(), null, null, null);
+            String redirectUrl = authorityUri.toASCIIString() + requestPath;
+            if (!Strings.isNullOrEmpty(queryString)) {
+                redirectUrl += "?" + queryString;
+            }
+            LOG.info("Redirect url: {}", request.getScheme() + "://" + 
addr.getHostname() + ":"
+                    + addr.getPort() + requestPath);
+            return redirectUrl;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        String redirectUrl = resultUriObj.toASCIIString();
-        if (!Strings.isNullOrEmpty(request.getQueryString())) {
-            redirectUrl += request.getQueryString();
-        }
-        LOG.info("Redirect url: {}", request.getScheme() + "://" + 
addr.getHostname() + ":"
-                + addr.getPort() + urlObj.getPath());
-        return redirectUrl;
+    }
+
+    protected void writeTemporaryRedirect(HttpServletResponse response, String 
redirectUrl) throws IOException {
+        response.setContentType("text/html;charset=utf-8");
+        response.setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        response.setHeader("Location", redirectUrl);
+        response.flushBuffer();
+    }
+
+    public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress 
addr) {
+        RedirectView redirectView = new RedirectView(buildRedirectUrl(request, 
addr));
+        redirectView.setContentType("text/html;charset=utf-8");
+        
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+        return redirectView;
+    }
+
+    public String getRedirectUrL(HttpServletRequest request, TNetworkAddress 
addr) {
+        return buildRedirectUrl(request, addr);
     }
 
     public RedirectView redirectToObj(String sign) throws URISyntaxException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
new file mode 100644
index 00000000000..a9d345ad38d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import com.google.common.base.Preconditions;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public final class StreamLoadRedirectDrainUtil {
+    private static final Logger LOG = 
LogManager.getLogger(StreamLoadRedirectDrainUtil.class);
+
+    private static final int BUFFER_SIZE = 8 * 1024;
+    private static final int IDLE_SLEEP_MS = 5;
+
+    private StreamLoadRedirectDrainUtil() {
+    }
+
+    public static DrainResult drainRequestBodyAfterRedirect(HttpServletRequest 
request, long maxBytes) {
+        try {
+            return drainRequestBodyAfterRedirect(request.getInputStream(), 
maxBytes);
+        } catch (IOException e) {
+            LOG.warn("failed to get request input stream for stream load 
redirect drain", e);
+            return new DrainResult(0, 0, ExitReason.ERROR);
+        }
+    }
+
+    static DrainResult drainRequestBodyAfterRedirect(ServletInputStream 
inputStream, long maxBytes) {
+        Preconditions.checkArgument(maxBytes > 0, "maxBytes must be positive");
+
+        long startNanos = System.nanoTime();
+        long drainedBytes = 0;
+        long idleStartNanos = -1L;
+        byte[] buffer = new byte[(int) Math.min(BUFFER_SIZE, maxBytes)];
+
+        try {
+            while (drainedBytes < maxBytes) {
+                // Prefer an explicit EOF signal before relying on available() 
as a readiness hint.
+                if (inputStream.isFinished()) {
+                    return new DrainResult(drainedBytes, 
elapsedMillis(startNanos), ExitReason.EOF);
+                }
+                int availableBytes = inputStream.available();
+                if (availableBytes <= 0) {
+                    // Allow a bounded idle window so slow clients can still 
deliver buffered bytes.
+                    idleStartNanos = idleStartNanos < 0 ? System.nanoTime() : 
idleStartNanos;
+                    DrainResult idleWaitResult = 
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+                    if (idleWaitResult != null) {
+                        return idleWaitResult;
+                    }
+                    continue;
+                }
+
+                idleStartNanos = -1L;
+                int readLimit = (int) Math.min(Math.min(maxBytes - 
drainedBytes, buffer.length), availableBytes);
+                int readBytes = inputStream.read(buffer, 0, readLimit);
+                if (readBytes < 0) {
+                    return new DrainResult(drainedBytes, 
elapsedMillis(startNanos), ExitReason.EOF);
+                }
+                if (readBytes == 0) {
+                    // Treat zero-byte reads as transient backpressure instead 
of busy-spinning.
+                    idleStartNanos = idleStartNanos < 0 ? System.nanoTime() : 
idleStartNanos;
+                    DrainResult idleWaitResult = 
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+                    if (idleWaitResult != null) {
+                        return idleWaitResult;
+                    }
+                    continue;
+                }
+                drainedBytes += readBytes;
+            }
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.MAX_BYTES);
+        } catch (IOException e) {
+            LOG.warn("failed while draining request body after stream load 
redirect", e);
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.ERROR);
+        }
+    }
+
+    // Convert a bounded idle wait into a terminal drain result when the grace 
window expires or gets interrupted.
+    private static DrainResult waitForMoreDataOrExit(long idleStartNanos, long 
drainedBytes, long startNanos) {
+        if (elapsedMillis(idleStartNanos) >= 
Config.stream_load_redirect_bounded_drain_max_idle_time_ms) {
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.IDLE_TIMEOUT);
+        }
+        if (!sleepForIdleWindow()) {
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.INTERRUPTED);
+        }
+        return null;
+    }
+
+    private static boolean sleepForIdleWindow() {
+        try {
+            Thread.sleep(IDLE_SLEEP_MS);
+            return true;
+        } catch (InterruptedException e) {
+            LOG.warn("stream load redirect drain idle wait is interrupted", e);
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
+
+    private static long elapsedMillis(long startNanos) {
+        return (System.nanoTime() - startNanos) / 1_000_000;
+    }
+
+    public enum ExitReason {
+        EOF,
+        MAX_BYTES,
+        IDLE_TIMEOUT,
+        INTERRUPTED,
+        ERROR
+    }
+
+    public static final class DrainResult {
+        private final long drainedBytes;
+        private final long elapsedMillis;
+        private final ExitReason exitReason;
+
+        public DrainResult(long drainedBytes, long elapsedMillis, ExitReason 
exitReason) {
+            this.drainedBytes = drainedBytes;
+            this.elapsedMillis = elapsedMillis;
+            this.exitReason = exitReason;
+        }
+
+        public long getDrainedBytes() {
+            return drainedBytes;
+        }
+
+        public long getElapsedMillis() {
+            return elapsedMillis;
+        }
+
+        public ExitReason getExitReason() {
+            return exitReason;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
index 28d7cda587c..4d57c25fd71 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
@@ -17,20 +17,226 @@
 
 package org.apache.doris.httpv2.rest;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
 import jakarta.servlet.http.HttpServletRequest;
-import org.junit.Assert;
-import org.junit.Test;
+import jakarta.servlet.http.HttpServletResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.servlet.view.RedirectView;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
 
 public class LoadActionTest {
+    private final boolean originalEnableDebugPoints = 
Config.enable_debug_points;
+    private final String originalCloudUniqueId = Config.cloud_unique_id;
+    private final boolean originalEnableGroupCommitStreamLoadBeForward =
+            Config.enable_group_commit_streamload_be_forward;
+
+
+    @AfterEach
+    public void tearDown() {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 1024L * 1024 * 
1024;
+        Config.enable_debug_points = originalEnableDebugPoints;
+        Config.cloud_unique_id = originalCloudUniqueId;
+        Config.enable_group_commit_streamload_be_forward = 
originalEnableGroupCommitStreamLoadBeForward;
+        DebugPointUtil.clearDebugPoints();
+        Thread.interrupted();
+        org.apache.doris.qe.ConnectContext.remove();
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseReturnsRedirectViewWhenBoundedDrainDisabled() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertTrue(result instanceof RedirectView);
+        RedirectView redirectView = (RedirectView) result;
+        
Assertions.assertEquals("http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";,
 redirectView.getUrl());
+        Mockito.verifyNoInteractions(response);
+    }
+
+    @Test
+    public void testCreateRedirectResponseWrites307WhenBoundedDrainEnabled() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, "chunked", 
true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseDrainsFixedLengthRequestWithinLimit() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(4, null, true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseSkipsDrainWithoutRequestBody() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseSkipsDrainWhenContentLengthIsZero() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(0, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseSkipsDrainWhenContentLengthExceedsLimit() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(16, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewReturnsRedirectViewWhenDrainDisabled()
 throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertSame(redirectView, result);
+        Mockito.verifyNoInteractions(response);
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewSkipsDrainWithoutRequestBody() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/redirect";);
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewDrainsChunkedRequest() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, "chunked", 
true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/redirect";);
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseKeepsNonStreamLoadBehavior() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), false, "db1", "tbl1", 
"label1");
+
+        Assertions.assertTrue(result instanceof RedirectView);
+        Mockito.verifyNoInteractions(response);
+    }
 
     @Test
     public void testGetAllHeadersMasksSensitiveHeaders() throws Exception {
-        LoadAction action = new LoadAction();
+        LoadAction loadAction = new LoadAction();
         HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
         
Mockito.when(request.getHeaderNames()).thenReturn(Collections.enumeration(Arrays.asList(
                 "Authorization", "Cookie", "Set-Cookie", "token", "label")));
@@ -38,12 +244,233 @@ public class LoadActionTest {
 
         Method method = LoadAction.class.getDeclaredMethod("getAllHeaders", 
HttpServletRequest.class);
         method.setAccessible(true);
-        String headers = (String) method.invoke(action, request);
+        String headers = (String) method.invoke(loadAction, request);
+
+        Assertions.assertTrue(headers.contains("Authorization:***MASKED***"));
+        Assertions.assertTrue(headers.contains("Cookie:***MASKED***"));
+        Assertions.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
+        Assertions.assertTrue(headers.contains("token:***MASKED***"));
+        Assertions.assertTrue(headers.contains("label:load_label"));
+    }
+
+    @Test
+    public void testExecuteWithoutPasswordRedirectsToBackend() throws 
Exception {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
+        org.apache.doris.qe.ConnectContext connectContext = new 
org.apache.doris.qe.ConnectContext();
+        connectContext.setThreadLocalInfo();
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+            Object result = invokeExecuteWithoutPassword(loadAction, request, 
response, "db1", "tbl1", true, false);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testExecuteWithClusterTokenRedirectsToBackend() throws 
Exception {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Env env = Mockito.mock(Env.class);
+        InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        // Provide the default catalog name required by 
ConnectContext.setEnv().
+        Mockito.when(env.getInternalCatalog()).thenReturn(internalCatalog);
+        Mockito.when(internalCatalog.getName()).thenReturn("internal");
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+        Mockito.when(request.getHeader("label")).thenReturn("label1");
+        Mockito.when(request.getRemoteAddr()).thenReturn("127.0.0.1");
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+            mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+
+            Object result = invokeExecuteWithClusterToken(loadAction, request, 
response, "db1", "tbl1", true);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testStreamLoadWithSqlRedirectsWhenExpectHeaderExists() {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        Mockito.when(request.getHeader("sql")).thenReturn("insert into 
db1.tbl1 values(1)");
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+        Mockito.when(request.getContentLengthLong()).thenReturn(-1L);
+        Mockito.when(request.getHeader("transfer-encoding")).thenReturn(null);
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+            Object result = loadAction.streamLoadWithSql(request, response);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testRedirectToStreamLoadForwardBuildsForwardUrl() throws 
Exception {
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("k=v");
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        RedirectView redirectView = 
invokeRedirectToStreamLoadForward(loadAction, request,
+                new TNetworkAddress("lb-host", 18040), "be-host:8040");
+
+        
Assertions.assertEquals("http://lb-host:18040/api/db1/tbl1/_stream_load_forward?k=v&forward_to=be-host:8040";,
+                redirectView.getUrl());
+    }
+
+    private Object invokeCreateRedirectResponse(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, TNetworkAddress redirectAddr, 
boolean isStreamLoad, String dbName,
+            String tableName, String label) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+                HttpServletRequest.class, HttpServletResponse.class, 
TNetworkAddress.class,
+                boolean.class, String.class, String.class, String.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
+    }
+
+    private Object invokeCreateRedirectResponse(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, RedirectView redirectView, boolean 
isStreamLoad, String dbName,
+            String tableName, String label) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+                HttpServletRequest.class, HttpServletResponse.class, 
RedirectView.class,
+                boolean.class, String.class, String.class, String.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, redirectView, 
isStreamLoad, dbName, tableName, label);
+    }
+
+    private Object invokeExecuteWithoutPassword(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, String db, String table, boolean 
isStreamLoad, boolean groupCommit)
+            throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("executeWithoutPassword",
+                HttpServletRequest.class, HttpServletResponse.class, 
String.class, String.class,
+                boolean.class, boolean.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, db, table, 
isStreamLoad, groupCommit);
+    }
+
+    private Object invokeExecuteWithClusterToken(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, String db, String table, boolean 
isStreamLoad) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("executeWithClusterToken",
+                HttpServletRequest.class, HttpServletResponse.class, 
String.class, String.class, boolean.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, db, table, 
isStreamLoad);
+    }
+
+    private RedirectView invokeRedirectToStreamLoadForward(LoadAction 
loadAction, HttpServletRequest request,
+            TNetworkAddress addr, String forwardTarget) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("redirectToStreamLoadForward",
+                HttpServletRequest.class, TNetworkAddress.class, String.class);
+        method.setAccessible(true);
+        return (RedirectView) method.invoke(loadAction, request, addr, 
forwardTarget);
+    }
+
+    private HttpServletRequest mockStreamLoadRequest() throws Exception {
+        return mockStreamLoadRequest(-1, null, true);
+    }
+
+    // Provide explicit request body metadata so the drain decision can be 
verified in isolation.
+    private HttpServletRequest mockStreamLoadRequest(long contentLength, 
String transferEncoding,
+            boolean stubInputStream) throws Exception {
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+        Mockito.when(request.getContentLengthLong()).thenReturn(contentLength);
+        
Mockito.when(request.getHeader("Transfer-Encoding")).thenReturn(transferEncoding);
+        
Mockito.when(request.getHeader("transfer-encoding")).thenReturn(transferEncoding);
+        if (stubInputStream) {
+            Mockito.when(request.getInputStream()).thenReturn(new 
IdleServletInputStream());
+        }
+        return request;
+    }
+
+    // Create a backend mock with only the redirect-related fields populated 
for lightweight controller tests.
+    private Backend mockBackend(String host, int httpPort, String 
privateEndpoint) {
+        Backend backend = Mockito.mock(Backend.class);
+        Mockito.when(backend.getHost()).thenReturn(host);
+        Mockito.when(backend.getHttpPort()).thenReturn(httpPort);
+        Mockito.when(backend.getPrivateEndpoint()).thenReturn(privateEndpoint);
+        Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+        return backend;
+    }
+
+    private static class TestLoadAction extends LoadAction {
+        @Override
+        public 
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo 
executeCheckPassword(
+                HttpServletRequest request,
+                HttpServletResponse response) {
+            return new 
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo();
+        }
+
+        @Override
+        protected void checkTblAuth(org.apache.doris.analysis.UserIdentity 
currentUser, String db, String tbl,
+                org.apache.doris.mysql.privilege.PrivPredicate predicate) {
+        }
+
+        @Override
+        protected boolean checkClusterToken(String token) {
+            return true;
+        }
+    }
+
+    private static class IdleServletInputStream extends ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 0;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
 
-        Assert.assertTrue(headers.contains("Authorization:***MASKED***"));
-        Assert.assertTrue(headers.contains("Cookie:***MASKED***"));
-        Assert.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
-        Assert.assertTrue(headers.contains("token:***MASKED***"));
-        Assert.assertTrue(headers.contains("label:load_label"));
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
new file mode 100644
index 00000000000..b838e25ef50
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RestBaseControllerTest {
+
+    @Test
+    public void testBuildRedirectUrlPreservesEncodedPath() {
+        // Keep the original encoded path unchanged when rebuilding the 
redirect URL.
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        TestRestController controller = new TestRestController();
+        String redirectUrl = controller.buildRedirectUrlForTest(request,
+                new TNetworkAddress("be-host", 8040), 
"/api/db%2Ftbl/_stream_load", "k=a%2Bb");
+
+        
Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load?k=a%2Bb";, 
redirectUrl);
+    }
+
+    @Test
+    public void testBuildRedirectUrlWithoutQueryString() {
+        // Avoid appending a dangling question mark when the original request 
has no query string.
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        TestRestController controller = new TestRestController();
+        String redirectUrl = controller.buildRedirectUrlForTest(request,
+                new TNetworkAddress("be-host", 8040), 
"/api/db%2Ftbl/_stream_load", null);
+
+        Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load";, 
redirectUrl);
+    }
+
+    // Expose the protected helper so the redirect URL can be verified 
directly.
+    private static class TestRestController extends RestBaseController {
+        private String buildRedirectUrlForTest(HttpServletRequest request, 
TNetworkAddress addr,
+                String requestPath, String queryString) {
+            return buildRedirectUrl(request, addr, requestPath, queryString);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
new file mode 100644
index 00000000000..8a8d6cae165
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class StreamLoadRedirectDrainUtilTest {
+
+    @AfterEach
+    public void tearDown() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+    }
+
+    @Test
+    public void testDrainRequestBodyWithinMaxBytes() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0), 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyAfterRedirectUsesRequestInputStream() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getInputStream())
+                .thenReturn(new 
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0));
+
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void 
testDrainRequestBodyAfterRedirectReturnsErrorWhenGetInputStreamFails() throws 
Exception {
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getInputStream()).thenThrow(new IOException("open 
error"));
+
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(0, drainResult.getElapsedMillis());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR, 
drainResult.getExitReason());
+    }
+
+    @Test
+    // Verify delayed body chunks are still drained when they arrive within 
the bounded idle window.
+    public void testDrainRequestBodyAllowsDelayedArrivalWithinIdleWindow() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 0, 0, 0, 0, 0, 5), 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyStopsAtMaxBytes() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new QueueAvailableServletInputStream("hello 
world".getBytes(), 11), 5);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.MAX_BYTES, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyIdleTimeout() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyUsesConfiguredIdleTime() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 0, 5), 16);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyInterruptedWhileWaitingForMoreData() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        try {
+            Thread.currentThread().interrupt();
+            StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                    
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 8);
+
+            Assertions.assertEquals(0, drainResult.getDrainedBytes());
+            
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.INTERRUPTED,
+                    drainResult.getExitReason());
+        } finally {
+            // Clear the interrupt flag to avoid affecting later tests in the 
same thread.
+            Thread.interrupted();
+        }
+    }
+
+    @Test
+    public void testDrainRequestBodyReadError() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ErrorServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyEof() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
EofServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyRejectsNonPositiveMaxBytes() {
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 0));
+    }
+
+    @Test
+    public void testDrainRequestBodyReturnsEofWhenReadReturnsNegative() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ReadNegativeServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyReadZeroTriggersIdleTimeout() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ReadZeroServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    private static class QueueAvailableServletInputStream extends 
ServletInputStream {
+        private final byte[] data;
+        private final Queue<Integer> availableValues = new ArrayDeque<>();
+        private int offset = 0;
+
+        QueueAvailableServletInputStream(byte[] data, int... availableValues) {
+            this.data = data;
+            for (int availableValue : availableValues) {
+                this.availableValues.add(availableValue);
+            }
+        }
+
+        @Override
+        public int read() {
+            if (offset >= data.length) {
+                return -1;
+            }
+            return data[offset++] & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            if (offset >= data.length) {
+                return -1;
+            }
+            int readBytes = Math.min(len, data.length - offset);
+            System.arraycopy(data, offset, b, off, readBytes);
+            offset += readBytes;
+            return readBytes;
+        }
+
+        @Override
+        public int available() {
+            if (!availableValues.isEmpty()) {
+                return availableValues.poll();
+            }
+            return Math.max(0, data.length - offset);
+        }
+
+        @Override
+        public boolean isFinished() {
+            return offset >= data.length;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ErrorServletInputStream extends ServletInputStream {
+        @Override
+        public int read() throws IOException {
+            throw new IOException("read error");
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            throw new IOException("read error");
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class EofServletInputStream extends ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return true;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ReadNegativeServletInputStream extends 
ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ReadZeroServletInputStream extends ServletInputStream 
{
+        @Override
+        public int read() {
+            return 0;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return 0;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    // Keep reporting no readable bytes without reaching EOF to simulate a 
stalled client.
+    private static class NeverReadyServletInputStream extends 
ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 0;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
 
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
new file mode 100644
index 00000000000..5dca0091b7e
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Exercise the FE redirect path with a chunked stream load request."""
+
+import argparse
+import base64
+import http.client
+import json
+import sys
+import time
+import uuid
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(
+        description="Send a chunked stream load request to FE and capture the 
redirect result."
+    )
+    parser.add_argument("--host", required=True, help="FE host")
+    parser.add_argument("--fe-http-port", required=True, type=int, help="FE 
HTTP port")
+    parser.add_argument("--user", required=True, help="FE HTTP user")
+    parser.add_argument("--password", default="", help="FE HTTP password")
+    parser.add_argument("--db", required=True, help="Target database")
+    parser.add_argument("--table", required=True, help="Target table")
+    parser.add_argument("--payload-mb", type=int, default=8, help="Approximate 
payload size in MiB")
+    parser.add_argument("--chunk-kb", type=int, default=8, help="Chunk size in 
KiB")
+    parser.add_argument("--sleep-ms", type=int, default=10, help="Delay 
between chunks in milliseconds")
+    parser.add_argument("--connect-timeout", type=int, default=5, 
help="Connect timeout in seconds")
+    parser.add_argument("--read-timeout", type=int, default=120, help="Read 
timeout in seconds")
+    return parser.parse_args()
+
+
+def build_auth_header(user, password):
+    token = 
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
+    return f"Basic {token}"
+
+
+def build_csv_chunk(chunk_bytes, chunk_index):
+    # Generate deterministic CSV rows so the request body is valid for stream 
load.
+    rows = []
+    current = 0
+    row_index = chunk_index * 100000
+    payload_width = max(8, min(256, max(1, chunk_bytes // 8)))
+    while True:
+        row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
+        row_size = len(row.encode("utf-8"))
+        if rows and current + row_size > chunk_bytes:
+            break
+        rows.append(row)
+        current += row_size
+        row_index += 1
+        if current >= chunk_bytes:
+            break
+    return "".join(rows).encode("utf-8")
+
+
+def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
+    sent = 0
+    chunk_index = 0
+    while sent < total_bytes:
+        current_size = min(chunk_bytes, total_bytes - sent)
+        yield build_csv_chunk(current_size, chunk_index)
+        sent += current_size
+        chunk_index += 1
+        if sleep_ms > 0:
+            time.sleep(sleep_ms / 1000.0)
+
+
+def main():
+    args = parse_args()
+    path = f"/api/{args.db}/{args.table}/_stream_load"
+    conn = http.client.HTTPConnection(args.host, args.fe_http_port, 
timeout=args.read_timeout)
+    label = f"stream_load_redirect_regression_{uuid.uuid4().hex[:12]}"
+    total_bytes = args.payload_mb * 1024 * 1024
+    chunk_bytes = args.chunk_kb * 1024
+    started = time.time()
+
+    try:
+        # Send the request body with explicit chunk framing so the client 
keeps writing
+        # while FE returns the redirect response.
+        conn.putrequest("PUT", path)
+        conn.putheader("Authorization", build_auth_header(args.user, 
args.password))
+        conn.putheader("Expect", "100-continue")
+        conn.putheader("Transfer-Encoding", "chunked")
+        conn.putheader("column_separator", ",")
+        conn.putheader("label", label)
+        conn.endheaders()
+
+        for chunk in chunked_csv_generator(total_bytes, chunk_bytes, 
args.sleep_ms):
+            conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
+            conn.send(chunk)
+            conn.send(b"\r\n")
+        conn.send(b"0\r\n\r\n")
+
+        response = conn.getresponse()
+        body = response.read().decode("utf-8", errors="replace")
+        result = {
+            "status_code": response.status,
+            "elapsed_seconds": round(time.time() - started, 3),
+            "headers": dict(response.getheaders()),
+            "body": body[:2000],
+            "exception_type": None,
+            "exception": None,
+            "label": label,
+        }
+        print(json.dumps(result, ensure_ascii=False))
+        return 0
+    except Exception as exc:
+        result = {
+            "status_code": None,
+            "elapsed_seconds": round(time.time() - started, 3),
+            "headers": {},
+            "body": "",
+            "exception_type": type(exc).__name__,
+            "exception": repr(exc),
+            "label": label,
+        }
+        print(json.dumps(result, ensure_ascii=False))
+        return 1
+    finally:
+        conn.close()
+
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
new file mode 100644
index 00000000000..282f14331ee
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_fe_redirect_chunked_e2e", "p0") {
+    String dbName = context.config.getDbNameByFile(context.file)
+    String tableName = "test_stream_load_fe_redirect_chunked_e2e"
+    String helperPath = 
"${context.file.parent}/scripts/stream_load_redirect_chunked_e2e.py"
+    String feHttpAddress = context.config.feHttpAddress
+    int feAddressSeparator = feHttpAddress.lastIndexOf(":")
+    assertTrue(feAddressSeparator > 0)
+    String feHost = feHttpAddress.substring(0, feAddressSeparator)
+    String fePort = feHttpAddress.substring(feAddressSeparator + 1)
+
+    sql """ CREATE DATABASE IF NOT EXISTS ${dbName} """
+    sql """ DROP TABLE IF EXISTS ${dbName}.${tableName} """
+    sql """
+        CREATE TABLE ${dbName}.${tableName} (
+            k1 BIGINT,
+            v1 STRING
+        )
+        DUPLICATE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    // Read the current FE values first so the ordinary suite can restore 
shared settings.
+    def getFrontendConfigValue = { configKey ->
+        def configRows = sql """ admin show frontend config """
+        def configRow = configRows.find { it[0] == configKey }
+        assertTrue(configRow != null)
+        return configRow[1].toString()
+    }
+
+    String originalDrainMaxBytes = 
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_bytes")
+    String originalDrainMaxIdleTimeMs = 
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_idle_time_ms")
+
+    // Treat common client-side disconnects as the reproduced historical 
failure signal.
+    def reproducedExceptionTypes = ["BrokenPipeError", "ConnectionResetError"] 
as Set
+
+    // Keep the helper single-shot and let the regression test control retries 
and assertions.
+    def runChunkedStreamLoad = {
+        def command = [
+            "python3",
+            helperPath,
+            "--host", feHost,
+            "--fe-http-port", fePort,
+            "--user", context.config.feHttpUser,
+            "--password", context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword,
+            "--db", dbName,
+            "--table", tableName,
+            "--payload-mb", "8",
+            "--chunk-kb", "8",
+            "--sleep-ms", "10"
+        ]
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.in.text.trim()
+        def err = process.err.text.trim()
+        log.info("stream load redirect helper stdout: ${out}")
+        if (!err.isEmpty()) {
+            log.info("stream load redirect helper stderr: ${err}")
+        }
+        return [code: code, result: parseJson(out)]
+    }
+
+    try {
+        // First disable the drain and reproduce the historical client 
disconnect behavior.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = "0") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "1") """
+        boolean brokenPipeObserved = false
+        for (int i = 0; i < 5; i++) {
+            def attempt = runChunkedStreamLoad()
+            def result = attempt.result
+            if (reproducedExceptionTypes.contains(result.exception_type)) {
+                brokenPipeObserved = true
+                break
+            }
+            assertEquals(307, result.status_code as Integer)
+            assertTrue(result.exception_type == null)
+            assertTrue(result.exception == null)
+        }
+        assertTrue(brokenPipeObserved)
+
+        // Then enable the drain and verify the same request finishes with a 
redirect.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = "16777216") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "2000") """
+        for (int i = 0; i < 3; i++) {
+            def attempt = runChunkedStreamLoad()
+            def result = attempt.result
+            assertEquals(0, attempt.code as Integer)
+            assertEquals(307, result.status_code as Integer)
+            assertTrue(result.exception_type == null)
+            assertTrue(result.exception == null)
+            
assertTrue(result.headers.Location.contains("/api/${dbName}/${tableName}/_stream_load"))
+        }
+    } finally {
+        // Restore the shared FE settings before leaving the ordinary 
regression environment.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = 
"${originalDrainMaxBytes}") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = 
"${originalDrainMaxIdleTimeMs}") """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to