This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4379c4ed217 branch-3.1: [feat](cloud) Support cloud group commit 
stream load BE forward mode #55326 (#55527)
4379c4ed217 is described below

commit 4379c4ed21759165edc46b1680057a83356bfb23
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 4 10:30:26 2025 +0800

    branch-3.1: [feat](cloud) Support cloud group commit stream load BE forward 
mode #55326 (#55527)
    
    Cherry-picked from #55326
    
    Co-authored-by: Xin Liao <[email protected]>
---
 be/src/http/action/stream_load.cpp                 |   3 +-
 be/src/http/action/stream_load_forward_handler.cpp | 397 +++++++++++++++++++++
 be/src/http/action/stream_load_forward_handler.h   | 136 +++++++
 be/src/http/http_request.cpp                       |   8 +
 be/src/http/http_request.h                         |   2 +
 be/src/service/http_service.cpp                    |   6 +
 .../main/java/org/apache/doris/common/Config.java  |   8 +
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 183 +++++++++-
 .../stream_load/test_group_commit_redirect.out     | Bin 0 -> 149 bytes
 .../stream_load/test_group_commit_redirect.groovy  | 185 ++++++++++
 10 files changed, 918 insertions(+), 10 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index cde4876388d..56412863b38 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -215,7 +215,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     }
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
-              << ", tbl=" << ctx->table << ", group_commit=" << 
ctx->group_commit;
+              << ", tbl=" << ctx->table << ", group_commit=" << 
ctx->group_commit
+              << ", HTTP headers=" << req->get_all_headers();
     ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
 
     if (st.ok()) {
diff --git a/be/src/http/action/stream_load_forward_handler.cpp 
b/be/src/http/action/stream_load_forward_handler.cpp
new file mode 100644
index 00000000000..3f1719312fb
--- /dev/null
+++ b/be/src/http/action/stream_load_forward_handler.cpp
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_forward_handler.h"
+
+#include <event2/buffer.h>
+#include <event2/http.h>
+#include <event2/http_struct.h>
+#include <event2/keyvalq_struct.h>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "util/byte_buffer.h"
+
+namespace doris {
+
+int StreamLoadForwardHandler::on_header(HttpRequest* req) {
+    std::ostringstream headers_info;
+    const auto& headers = req->headers();
+    for (const auto& header : headers) {
+        headers_info << header.first << ":" << header.second << " ";
+    }
+
+    std::ostringstream params_info;
+    const auto* params = req->params();
+    for (const auto& param : *params) {
+        params_info << param.first << "=" << param.second << " ";
+    }
+
+    LOG(INFO) << "StreamLoadForward request started - "
+              << "path: " << req->raw_path() << ", remote: " << 
req->remote_host() << ", headers: ["
+              << headers_info.str() << "]"
+              << ", params: [" << params_info.str() << "]";
+
+    std::shared_ptr<StreamLoadForwardContext> ctx(new 
StreamLoadForwardContext());
+    req->set_handler_ctx(ctx);
+
+    auto it = params->find("forward_to");
+    if (it == params->end()) {
+        LOG(WARNING) << "StreamLoadForward failed - missing forward_to 
parameter, path: "
+                     << req->raw_path();
+        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
+                                "Missing required parameter 'forward_to'. "
+                                "Usage: ?forward_to=host:port");
+        return HttpStatus::BAD_REQUEST;
+    }
+
+    std::string target_host;
+    int target_port;
+    Status st = parse_forward_target(it->second, target_host, target_port);
+    if (!st.ok()) {
+        LOG(WARNING) << "StreamLoadForward failed - invalid forward target: " 
<< st.to_string()
+                     << ", path: " << req->raw_path();
+        HttpChannel::send_reply(
+                req, HttpStatus::BAD_REQUEST,
+                "Invalid forward_to parameter: " + st.to_string() + ". 
Expected format: host:port");
+        return HttpStatus::BAD_REQUEST;
+    }
+
+    ctx->target_host = target_host;
+    ctx->target_port = target_port;
+    ctx->original_req = req;
+
+    Status init_st = init_forward_request(req, target_host, target_port, ctx);
+    if (!init_st.ok()) {
+        LOG(WARNING) << "StreamLoadForward failed - failed to initialize 
forward request: "
+                     << init_st.to_string() << ", target: " << target_host << 
":" << target_port
+                     << ", path: " << req->raw_path();
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Failed to initialize forward request: " + 
init_st.to_string());
+        return HttpStatus::INTERNAL_SERVER_ERROR;
+    }
+
+    return HttpStatus::OK;
+}
+
+void StreamLoadForwardHandler::handle(HttpRequest* req) {
+    auto ctx = 
std::static_pointer_cast<StreamLoadForwardContext>(req->handler_ctx());
+    if (!ctx) {
+        LOG(WARNING) << "StreamLoadForward failed - context not found, path: " 
<< req->raw_path();
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Internal error: context not found");
+        return;
+    }
+
+    auto* forward_req = ctx->get_forward_request();
+    if (!forward_req) {
+        LOG(WARNING) << "Forward request not ready, path: " << req->raw_path();
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Internal error: forward request not 
initialized");
+        return;
+    }
+
+    setup_forward_headers(req, forward_req, ctx->target_host, 
ctx->target_port);
+
+    if (!ctx->request_data_chunks.empty()) {
+        evbuffer* output = evhttp_request_get_output_buffer(forward_req);
+        while (!ctx->request_data_chunks.empty()) {
+            const auto& bb = ctx->request_data_chunks.front();
+            if (evbuffer_add(output, bb->ptr, bb->limit) != 0) {
+                LOG(WARNING) << "Failed to add buffered data to output buffer, 
chunk size: "
+                             << bb->limit << ", total size: " << 
ctx->total_request_size
+                             << ", path: " << req->raw_path();
+                HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                        "Failed to prepare forward data");
+                return;
+            }
+            ctx->request_data_chunks.pop_front();
+        }
+    }
+
+    if (evhttp_make_request(ctx->conn, forward_req, EVHTTP_REQ_PUT,
+                            build_forward_url(req).c_str()) != 0) {
+        LOG(WARNING) << "Failed to make forward request to " << 
ctx->target_host << ":"
+                     << ctx->target_port << ", path: " << req->raw_path();
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Failed to forward request to target server: " 
+ ctx->target_host +
+                                        ":" + 
std::to_string(ctx->target_port));
+        return;
+    }
+
+    LOG(INFO) << "StreamLoadForward request sent - data size: " << 
ctx->total_request_size
+              << ", target: " << ctx->target_host << ":" << ctx->target_port
+              << ", path: " << req->raw_path();
+}
+
+void StreamLoadForwardHandler::on_chunk_data(HttpRequest* req) {
+    auto ctx = 
std::static_pointer_cast<StreamLoadForwardContext>(req->handler_ctx());
+    if (!ctx) {
+        LOG(WARNING) << "No context found for chunk data";
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Internal error: context not found");
+        return;
+    }
+
+    evbuffer* input = 
evhttp_request_get_input_buffer(req->get_evhttp_request());
+    while (evbuffer_get_length(input) > 0) {
+        size_t remaining_length = evbuffer_get_length(input);
+        ByteBufferPtr bb;
+        Status st = ByteBuffer::allocate(remaining_length, &bb);
+        if (!st.ok()) {
+            LOG(WARNING) << "Failed to allocate ByteBuffer: " << st.to_string()
+                         << ", path: " << req->raw_path();
+            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                    "Failed to allocate memory for request 
data");
+            return;
+        }
+
+        auto remove_bytes = evbuffer_remove(input, bb->ptr, bb->capacity);
+        bb->pos = remove_bytes;
+        bb->flip();
+
+        ctx->request_data_chunks.emplace_back(bb);
+        ctx->total_request_size += remove_bytes;
+    }
+}
+
+Status StreamLoadForwardHandler::init_forward_request(
+        HttpRequest* req, const std::string& target_host, int target_port,
+        std::shared_ptr<StreamLoadForwardContext>& ctx) {
+    ctx->original_req = req;
+
+    struct event_base* ev_base =
+            
evhttp_connection_get_base(evhttp_request_get_connection(req->get_evhttp_request()));
+
+    struct evhttp_connection* conn = evhttp_connection_base_new(ev_base,
+                                                                nullptr, // 
dns base
+                                                                
target_host.c_str(), target_port);
+
+    if (!conn) {
+        return Status::InternalError("Failed to create connection to target 
server");
+    }
+
+    evhttp_connection_set_closecb(conn, forward_connection_close_cb, 
ctx.get());
+    ctx->conn = conn;
+
+    struct evhttp_request* forward_req = 
evhttp_request_new(forward_request_done, ctx.get());
+    if (!forward_req) {
+        evhttp_connection_free(conn);
+        return Status::InternalError("Failed to create forward request");
+    }
+
+    evhttp_request_set_chunked_cb(forward_req, forward_request_chunked_cb);
+
+    ctx->set_forward_request(forward_req);
+    return Status::OK();
+}
+
+void StreamLoadForwardHandler::forward_request_done(struct evhttp_request* 
req, void* arg) {
+    auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+
+    if (!req) {
+        LOG(ERROR) << "Forward request failed - no response";
+        evhttp_send_error(ctx->original_req->get_evhttp_request(), 503,
+                          "Backend server unavailable");
+        return;
+    }
+
+    int response_code = evhttp_request_get_response_code(req);
+    const char* response_reason = evhttp_request_get_response_code_line(req);
+
+    LOG(INFO) << "StreamLoadForward completed - "
+              << "status: " << response_code
+              << ", reason: " << (response_reason ? response_reason : 
"Unknown")
+              << ", response_size: " << ctx->response_data.size() << " bytes"
+              << ", path: " << ctx->original_req->raw_path();
+
+    send_complete_response(req, ctx, response_code);
+}
+
+void StreamLoadForwardHandler::forward_request_chunked_cb(struct 
evhttp_request* req, void* arg) {
+    auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+    struct evbuffer* input_buffer = evhttp_request_get_input_buffer(req);
+    if (input_buffer) {
+        size_t data_len = evbuffer_get_length(input_buffer);
+        if (data_len > 0) {
+            // Read all available data and append to our response buffer
+            char* data = (char*)evbuffer_pullup(input_buffer, data_len);
+            if (data) {
+                ctx->response_data.append(data, data_len);
+
+                // Remove the data from the buffer since we've copied it
+                evbuffer_drain(input_buffer, data_len);
+            } else {
+                LOG(WARNING) << "Failed to pullup " << data_len << " bytes 
from input buffer";
+            }
+        }
+    }
+}
+
+void StreamLoadForwardHandler::send_complete_response(struct evhttp_request* 
req,
+                                                      
StreamLoadForwardContext* ctx,
+                                                      int response_code) {
+    struct evbuffer* response_body = evbuffer_new();
+    if (!response_body) {
+        LOG(ERROR) << "Failed to create response buffer";
+        HttpChannel::send_reply(ctx->original_req, 
HttpStatus::INTERNAL_SERVER_ERROR,
+                                "Internal error: failed to create response 
buffer");
+        return;
+    }
+
+    size_t body_len = ctx->response_data.size();
+    if (body_len > 0) {
+        evbuffer_add(response_body, ctx->response_data.c_str(), body_len);
+    }
+
+    struct evkeyvalq* input_headers = evhttp_request_get_input_headers(req);
+    struct evkeyvalq* output_headers =
+            
evhttp_request_get_output_headers(ctx->original_req->get_evhttp_request());
+
+    size_t final_body_len = evbuffer_get_length(response_body);
+    evhttp_add_header(output_headers, "Content-Length", 
std::to_string(final_body_len).c_str());
+
+    copy_response_headers(input_headers, output_headers);
+
+    evhttp_send_reply(ctx->original_req->get_evhttp_request(), response_code,
+                      evhttp_request_get_response_code_line(req), 
response_body);
+
+    evbuffer_free(response_body);
+}
+
+void StreamLoadForwardHandler::copy_response_headers(struct evkeyvalq* 
input_headers,
+                                                     struct evkeyvalq* 
output_headers) {
+    // Copy headers from upstream, excluding specific ones we manage ourselves
+    for (struct evkeyval* header = input_headers->tqh_first; header != nullptr;
+         header = header->next.tqe_next) {
+        if (strcasecmp(header->key, "Transfer-Encoding") == 0 ||
+            strcasecmp(header->key, "Content-Length") == 0 ||
+            strcasecmp(header->key, "Date") == 0 || strcasecmp(header->key, 
"Server") == 0 ||
+            strcasecmp(header->key, "Content-Type") == 0) {
+            continue;
+        }
+        const char* value = header->value ? header->value : "";
+        evhttp_add_header(output_headers, header->key, value);
+    }
+}
+
+void StreamLoadForwardHandler::forward_connection_close_cb(struct 
evhttp_connection* conn,
+                                                           void* arg) {
+    auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+    if (!ctx) {
+        LOG(WARNING) << "Context is null in connection close callback";
+        return;
+    }
+
+    ctx->handle_connection_close();
+}
+
+Status StreamLoadForwardHandler::parse_forward_target(const std::string& 
forward_to,
+                                                      std::string& host, int& 
port) {
+    size_t pos = forward_to.find(':');
+    if (pos == std::string::npos) {
+        return Status::InvalidArgument("Invalid forward_to format, should be 
host:port, got: {}",
+                                       forward_to);
+    }
+
+    host = forward_to.substr(0, pos);
+    std::string port_str = forward_to.substr(pos + 1);
+
+    try {
+        port = std::stoi(port_str);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Exception while parsing port: " << port_str << ", 
what(): " << e.what();
+        return Status::InvalidArgument("Invalid port number in forward_to: {}, 
exception: {}",
+                                       port_str, e.what());
+    }
+
+    if (port <= 0 || port > 65535) {
+        return Status::InvalidArgument("Port number must be between 1 and 
65535, got: {}", port);
+    }
+
+    return Status::OK();
+}
+
+std::string StreamLoadForwardHandler::build_forward_url(HttpRequest* req) {
+    std::string url;
+    const std::string& raw_path = req->raw_path();
+
+    // Parse /api/{db}/{table}/ part
+    size_t pos = raw_path.find("/_stream_load_forward");
+    if (pos != std::string::npos) {
+        // Keep path prefix, replace _stream_load_forward with _stream_load
+        url = raw_path.substr(0, pos) + "/_stream_load";
+    } else {
+        // If not found, use original path
+        url = raw_path;
+    }
+
+    // Remove forward_to parameter, keep other parameters
+    const auto& params = req->params();
+    std::vector<std::string> query_parts;
+    for (const auto& param : *params) {
+        if (param.first != "forward_to") {
+            query_parts.push_back(param.first + "=" + param.second);
+        }
+    }
+
+    if (!query_parts.empty()) {
+        std::ostringstream oss;
+        for (size_t i = 0; i < query_parts.size(); ++i) {
+            if (i != 0) {
+                oss << "&";
+            }
+            oss << query_parts[i];
+        }
+        url += "?" + oss.str();
+    }
+
+    return url;
+}
+
+void StreamLoadForwardHandler::setup_forward_headers(HttpRequest* req,
+                                                     struct evhttp_request* 
forward_req,
+                                                     const std::string& 
target_host,
+                                                     int target_port) {
+    struct evkeyvalq* input_headers = 
evhttp_request_get_input_headers(req->get_evhttp_request());
+    struct evkeyvalq* output_headers = 
evhttp_request_get_output_headers(forward_req);
+
+    // Copy all headers from original request, except Host, Transfer-Encoding, 
and Content-Length
+    for (struct evkeyval* header = input_headers->tqh_first; header != nullptr;
+         header = header->next.tqe_next) {
+        // Skip headers that conflict with libevent's automatic handling
+        if (strcasecmp(header->key, "Host") == 0 ||
+            strcasecmp(header->key, "Transfer-Encoding") == 0 ||
+            strcasecmp(header->key, "Content-Length") == 0) {
+            continue;
+        }
+        evhttp_add_header(output_headers, header->key, header->value);
+    }
+
+    // Set new Host header
+    evhttp_add_header(output_headers, "Host",
+                      fmt::format("{}:{}", target_host, target_port).c_str());
+    // Add forwarding related headers
+    evhttp_add_header(output_headers, "X-Forwarded-For", req->remote_host());
+    evhttp_add_header(output_headers, "X-Forwarded-Proto", "http");
+    evhttp_add_header(output_headers, "X-Forwarded-Host",
+                      evhttp_request_get_host(req->get_evhttp_request()));
+}
+
+} // namespace doris
diff --git a/be/src/http/action/stream_load_forward_handler.h 
b/be/src/http/action/stream_load_forward_handler.h
new file mode 100644
index 00000000000..c5c7c674ddc
--- /dev/null
+++ b/be/src/http/action/stream_load_forward_handler.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <event2/http.h>
+
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "common/status.h"
+#include "http/http_handler.h"
+#include "http/http_request.h"
+#include "util/byte_buffer.h"
+
+namespace doris {
+
+// Context for storing stream load forward request information
+class StreamLoadForwardContext {
+public:
+    StreamLoadForwardContext() = default;
+    ~StreamLoadForwardContext() {
+        struct evhttp_connection* conn_to_free = nullptr;
+        {
+            std::lock_guard<std::mutex> lock(_mutex);
+            // forward request will be released by libevent automatically
+            forward_req = nullptr;
+            if (conn && !connection_closed) {
+                // Only free connection if it wasn't already released by 
libevent
+                conn_to_free = conn;
+                conn = nullptr;
+            }
+        }
+        // Free connection outside of mutex to avoid deadlock with connection 
close callback
+        if (conn_to_free) {
+            evhttp_connection_free(conn_to_free);
+        }
+    }
+
+    void set_forward_request(evhttp_request* req) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        forward_req = req;
+    }
+
+    evhttp_request* get_forward_request() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return forward_req;
+    }
+
+    void handle_connection_close() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        connection_closed = true;
+        // Connection has been released by libevent automatically, set pointer 
to null
+        // to prevent double-free in destructor
+        conn = nullptr;
+    }
+
+    bool is_connection_closed() const {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return connection_closed;
+    }
+
+    struct evhttp_connection* conn {nullptr};
+    // Original request reference, lifecycle managed by HTTP framework
+    HttpRequest* original_req {nullptr};
+
+    std::string target_host;
+    int target_port {0};
+
+    // Buffer for collecting response data
+    std::string response_data;
+
+    std::deque<ByteBufferPtr> request_data_chunks;
+    size_t total_request_size = 0;
+
+private:
+    mutable std::mutex _mutex;
+    struct evhttp_request* forward_req {nullptr};
+    bool connection_closed {false};
+};
+
+// Stream Load request forward handler
+// Forwards Stream Load requests to other BE nodes
+// Supports streaming forward, maintains original request path format: 
/api/{db}/{table}/_stream_load_forward
+class StreamLoadForwardHandler : public HttpHandler {
+public:
+    StreamLoadForwardHandler() = default;
+    ~StreamLoadForwardHandler() override = default;
+
+    void handle(HttpRequest* req) override;
+
+    bool request_will_be_read_progressively() override { return true; }
+
+    int on_header(HttpRequest* req) override;
+
+    void on_chunk_data(HttpRequest* req) override;
+
+private:
+    Status init_forward_request(HttpRequest* req, const std::string& 
target_host, int target_port,
+                                std::shared_ptr<StreamLoadForwardContext>& 
ctx);
+
+    static void forward_request_done(struct evhttp_request* req, void* arg);
+    static void forward_request_chunked_cb(struct evhttp_request* req, void* 
arg);
+    static void forward_connection_close_cb(struct evhttp_connection* conn, 
void* arg);
+
+    // Response helper functions
+    static void send_complete_response(struct evhttp_request* req, 
StreamLoadForwardContext* ctx,
+                                       int response_code);
+    static void copy_response_headers(struct evkeyvalq* input_headers,
+                                      struct evkeyvalq* output_headers);
+
+    Status parse_forward_target(const std::string& forward_to, std::string& 
host, int& port);
+
+    std::string build_forward_url(HttpRequest* req);
+
+    void setup_forward_headers(HttpRequest* req, struct evhttp_request* 
forward_req,
+                               const std::string& target_host, int 
target_port);
+};
+
+} // namespace doris
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 14bde591b4c..f7432978016 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -110,6 +110,14 @@ const std::string& HttpRequest::param(const std::string& 
key) const {
     return iter->second;
 }
 
+std::string HttpRequest::get_all_headers() const {
+    std::stringstream headers;
+    for (const auto& header : _headers) {
+        headers << header.first << ":" << header.second + ", ";
+    }
+    return headers.str();
+}
+
 void HttpRequest::add_output_header(const char* key, const char* value) {
     evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
 }
diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h
index a9286410aff..41d8cf98baa 100644
--- a/be/src/http/http_request.h
+++ b/be/src/http/http_request.h
@@ -55,6 +55,8 @@ public:
     // return params
     const StringCaseUnorderedMap<std::string>& headers() { return _headers; }
 
+    std::string get_all_headers() const;
+
     // return params
     std::map<std::string, std::string>* params() { return &_params; }
 
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 32ea86b09b1..6c604dd09d0 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -65,6 +65,7 @@
 #include "http/action/snapshot_action.h"
 #include "http/action/stream_load.h"
 #include "http/action/stream_load_2pc.h"
+#include "http/action/stream_load_forward_handler.h"
 #include "http/action/tablet_migration_action.h"
 #include "http/action/tablets_distribution_action.h"
 #include "http/action/tablets_info_action.h"
@@ -133,6 +134,11 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::PUT, 
"/api/{db}/{table}/_stream_load_2pc",
                                       streamload_2pc_action);
 
+    // register stream load forward handler
+    auto* forward_handler = _pool.add(new StreamLoadForwardHandler());
+    _ev_http_server->register_handler(HttpMethod::PUT, 
"/api/{db}/{table}/_stream_load_forward",
+                                      forward_handler);
+
     // register http_stream
     HttpStreamAction* http_stream_action = _pool.add(new 
HttpStreamAction(_env));
     _ev_http_server->register_handler(HttpMethod::PUT, "/api/_http_stream", 
http_stream_action);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 26fe184f6ff..95b93afb8f5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3260,6 +3260,14 @@ public class Config extends ConfigBase {
             + "public-private/public/private/direct/random-be and empty 
string" })
     public static String streamload_redirect_policy = "";
 
+    @ConfField(mutable = true, description = {
+            "存算分离模式下是否启用group commit的streamload BE转发功能。"
+                    + "解决LB随机转发导致group commit攒批失效的问题,通过BE二次转发确保同表请求到达同一BE节点。",
+            "Whether to enable group commit streamload BE forward feature in 
cloud mode. "
+                    + "Solves the issue where LB random forwarding breaks 
group commit batching "
+                    + "by implementing BE-level forwarding to ensure 
same-table requests reach the same BE node." })
+    public static boolean enable_group_commit_streamload_be_forward = false;
+
     @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
         "create table in cloud mode, check recycler key remained, default 
true"})
     public static boolean check_create_table_recycle_key_remained = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 065c93bcc76..b79b67fedcd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -324,7 +325,8 @@ public class LoadAction extends RestBaseController {
 
                     tableId = ((OlapTable) olapTable.get()).getId();
                 }
-                redirectAddr = selectRedirectBackend(request, groupCommit, 
tableId);
+                // Handle stream load with potential group commit forwarding
+                redirectAddr = handleStreamLoadRedirect(request, groupCommit, 
tableId, dbName, tableName, label);
             }
 
             if (LOG.isDebugEnabled()) {
@@ -334,6 +336,9 @@ public class LoadAction extends RestBaseController {
 
             RedirectView redirectView = redirectTo(request, redirectAddr);
             return redirectView;
+        } catch (StreamLoadForwardException e) {
+            // Special handling for stream load forwarding
+            return e.getRedirectView();
         } catch (Exception e) {
             LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, 
err: {}",
                     isStreamLoad, db, table, label, e.getMessage());
@@ -398,6 +403,11 @@ public class LoadAction extends RestBaseController {
 
     private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit, long tableId)
             throws LoadException {
+        return selectRedirectBackend(request, groupCommit, tableId, null);
+    }
+
+    private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit, long tableId,
+            Backend preSelectedBackend) throws LoadException {
         long debugBackendId = 
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
 -1L);
         if (debugBackendId != -1L) {
             Backend backend = 
Env.getCurrentSystemInfo().getBackend(debugBackendId);
@@ -408,17 +418,17 @@ public class LoadAction extends RestBaseController {
             if (Strings.isNullOrEmpty(cloudClusterName)) {
                 throw new LoadException("No cloud cluster name selected.");
             }
-            return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit, tableId);
+            return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit, tableId, preSelectedBackend);
         } else {
             if (groupCommit && tableId == -1) {
                 throw new LoadException("Group commit table id wrong.");
             }
-            return selectLocalRedirectBackend(groupCommit, request, tableId);
+            return selectLocalRedirectBackend(groupCommit, request, tableId, 
preSelectedBackend);
         }
     }
 
-    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, 
HttpServletRequest request, long tableId)
-            throws LoadException {
+    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, 
HttpServletRequest request, long tableId,
+            Backend preSelectedBackend) throws LoadException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -438,7 +448,9 @@ public class LoadAction extends RestBaseController {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
         if (groupCommit) {
-            backend = selectBackendForGroupCommit("", request, tableId);
+            // Use pre-selected backend if provided to avoid duplicate calls
+            backend = preSelectedBackend != null ? preSelectedBackend
+                    : selectBackendForGroupCommit("", request, tableId);
         } else {
             backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
         }
@@ -449,11 +461,12 @@ public class LoadAction extends RestBaseController {
     }
 
     private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
HttpServletRequest req, boolean groupCommit,
-            long tableId)
-            throws LoadException {
+            long tableId, Backend preSelectedBackend) throws LoadException {
         Backend backend = null;
         if (groupCommit) {
-            backend = selectBackendForGroupCommit(clusterName, req, tableId);
+            // Use pre-selected backend if provided to avoid duplicate calls
+            backend = preSelectedBackend != null ? preSelectedBackend
+                    : selectBackendForGroupCommit(clusterName, req, tableId);
         } else {
             backend = StreamLoadHandler.selectBackend(clusterName);
         }
@@ -917,4 +930,156 @@ public class LoadAction extends RestBaseController {
 
     }
 
+    /*
+     * Create redirect URL for stream load forward mode.
+     *
+     * This method constructs the special redirect URL used in the group 
commit forwarding mechanism:
+     *
+     * Key modifications to the standard redirect:
+     * 1. Path modification: Changes "/_stream_load" to "/_stream_load_forward"
+     *    - This tells the receiving BE that it needs to perform additional 
forwarding
+     *    - The "_stream_load_forward" endpoint is specifically designed to 
handle forwarding logic
+     *
+     * 2. Forward target parameter: Adds "forward_to=host:port" to the query 
string
+     *    - Specifies the actual target BE node that should process this 
request
+     *    - Ensures all requests for the same table reach the same BE for 
optimal batching
+     *
+     * 3. Authentication preservation: Maintains user authentication in the 
URL if present
+     *    - Ensures the forwarded request has proper authentication context
+     *
+     * Example transformation:
+     * Original: http://endpoint:port/api/db/table/_stream_load?param=value
+     * Forward:  
http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port
+     *
+     * @param request the original HTTP request
+     * @param addr the endpoint address to redirect to (public/private 
endpoint)
+     * @param forwardTarget the target BE node in "host:port" format for final 
processing
+     * @return RedirectView configured for stream load forwarding
+     */
+    private RedirectView redirectToStreamLoadForward(HttpServletRequest 
request, TNetworkAddress addr,
+            String forwardTarget) {
+        URI urlObj = null;
+        URI resultUriObj = null;
+        String urlStr = request.getRequestURI();
+        String userInfo = null;
+        String modifiedPath = null;
+
+        if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
+            ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
+            userInfo = 
ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
+                    + ":" + authInfo.password;
+        }
+        try {
+            urlObj = new URI(urlStr);
+            // Replace _stream_load with _stream_load_forward in the path
+            modifiedPath = urlObj.getPath().replace("/_stream_load", 
"/_stream_load_forward");
+            resultUriObj = new URI("http", userInfo, addr.getHostname(),
+                    addr.getPort(), modifiedPath, "", null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        String redirectUrl = resultUriObj.toASCIIString();
+
+        // Add forward_to parameter (note: toASCIIString() already includes 
'?' due to empty query)
+        String queryString = request.getQueryString();
+        if (!Strings.isNullOrEmpty(queryString)) {
+            redirectUrl += queryString + "&forward_to=" + forwardTarget;
+        } else {
+            redirectUrl += "forward_to=" + forwardTarget;
+        }
+
+        LOG.info("Redirect stream load forward url: {}, forward_to: {}",
+                "http://"; + addr.getHostname() + ":" + addr.getPort() + 
modifiedPath, forwardTarget);
+        RedirectView redirectView = new RedirectView(redirectUrl);
+        redirectView.setContentType("text/html;charset=utf-8");
+        
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+        return redirectView;
+    }
+
+    /**
+     * Handle stream load redirect with optional group commit forwarding.
+     *
+     * Group Commit Stream Load Forward Mode in Cloud Environment:
+     *
+     * Problem:
+     * Group commit requires that requests for the same table be sent to the 
same BE node
+     * to achieve better batching efficiency. However, in cloud mode with Load 
Balancer (LB),
+     * the LB randomly selects a BE node for forwarding, which breaks the 
group commit strategy
+     * and reduces batching effectiveness.
+     *
+     * Solution:
+     * Implement a two-stage forwarding mechanism:
+     * 1. FE redirects to public/private endpoint (LB) as usual
+     * 2. BE performs a second forwarding to the actual target BE node that 
handles the specific table
+     *
+     * This ensures that all requests for the same table ultimately reach the 
same BE node,
+     * preserving the group commit batching strategy while still utilizing the 
LB infrastructure.
+     *
+     * @param request the HTTP request
+     * @param groupCommit whether group commit is enabled
+     * @param tableId the table ID for group commit
+     * @param dbName database name for logging
+     * @param tableName table name for logging
+     * @param label label for logging
+     * @return redirect address for normal redirect
+     * @throws StreamLoadForwardException if forward redirect is applied
+     * @throws LoadException if redirect selection fails
+     */
+    private TNetworkAddress handleStreamLoadRedirect(HttpServletRequest 
request, boolean groupCommit,
+            long tableId, String dbName, String tableName, String label) 
throws LoadException {
+
+        // Check if group commit forwarding is needed
+        if (!Config.isCloudMode() || !groupCommit || 
!Config.enable_group_commit_streamload_be_forward) {
+            return selectRedirectBackend(request, groupCommit, tableId);
+        }
+
+        String cloudClusterName = getCloudClusterName(request);
+        if (Strings.isNullOrEmpty(cloudClusterName)) {
+            throw new LoadException("No cloud cluster name selected for group 
commit forwarding.");
+        }
+
+        // Get target backend for group commit
+        Backend targetBackend = selectBackendForGroupCommit(cloudClusterName, 
request, tableId);
+        if (targetBackend == null) {
+            throw new LoadException("Failed to select target backend for group 
commit forwarding.");
+        }
+
+        // Get redirect address with optimized backend selection
+        TNetworkAddress redirectAddr = 
selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId,
+                targetBackend);
+        TNetworkAddress targetAddr = new 
TNetworkAddress(targetBackend.getHost(), targetBackend.getHttpPort());
+
+        // Apply forwarding if addresses differ (compare hostname and port 
directly)
+        if (!redirectAddr.getHostname().equals(targetAddr.getHostname())
+                || redirectAddr.getPort() != targetAddr.getPort()) {
+            // Apply stream load forwarding by throwing 
StreamLoadForwardException with RedirectView
+            String forwardTarget = targetAddr.getHostname() + ":" + 
targetAddr.getPort();
+            RedirectView forwardRedirectView = 
redirectToStreamLoadForward(request, redirectAddr, forwardTarget);
+
+            LOG.info("Using stream load forward mode for cloud group commit - "
+                    + "db: {}, tbl: {}, label: {}, endpoint: {}, forward_to: 
{}, reason: redirect_differs_from_target",
+                    dbName, tableName, label, redirectAddr.toString(), 
forwardTarget);
+
+            throw new StreamLoadForwardException(forwardRedirectView);
+        } else {
+            LOG.debug("Skip stream load forward - redirect address matches 
target backend: {}",
+                    redirectAddr.toString());
+            return redirectAddr;
+        }
+    }
+
+    /**
+     * Special exception to carry RedirectView for stream load forwarding.
+     */
+    private static class StreamLoadForwardException extends RuntimeException {
+        private final RedirectView redirectView;
+
+        public StreamLoadForwardException(RedirectView redirectView) {
+            this.redirectView = redirectView;
+        }
+
+        public RedirectView getRedirectView() {
+            return redirectView;
+        }
+    }
 }
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out 
b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out
new file mode 100644
index 00000000000..abb03a16633
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out differ
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy 
b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy
new file mode 100644
index 00000000000..c7b7b6c5a94
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_group_commit_redirect', 'docker') {
+    def databaseName = context.config.getDbNameByFile(context.file)
+    def tableName = "tbl"
+    def getRedirectLocation = { feIp, fePort, redirectPolicy, mode ->
+        def command = """ curl -v --max-redirs 0 --location-trusted -u root:  
+                -H redirect-policy:$redirectPolicy  
+                -H group_commit:$mode
+                 -T 
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv 
+                
http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """
+        log.info("redirect command: ${command}")
+
+        def code = -1
+        def location = ""
+        try {
+            def process = command.execute()
+            code = process.waitFor()
+            // Parse Location from stderr since curl -v outputs headers to 
stderr
+            def errorOutput = process.err.text
+            def locationLine = errorOutput.readLines().find { 
it.trim().startsWith('< Location: ') }
+            if (locationLine) {
+                location = locationLine.trim().substring('< Location: 
'.length())
+            }
+            log.info("curl output: ${process.text}")
+            log.info("curl error: ${errorOutput}")
+        } catch (Exception e) {
+            log.info("exception: ${e}".toString())
+        }
+        return location
+    }
+
+    def groupCommitRedicetSteamLoad = { beIp, bePort, targetBe, targetBePort  
->
+        def command = """ curl -v --location-trusted -u root:  
+                -H group_commit:async_mode
+                -H column_separator:,
+                -H columns:id,name
+                 -T 
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv 
+                
http://${beIp}:${bePort}/api/${databaseName}/${tableName}/_stream_load_forward?forward_to=${targetBe}:${targetBePort}
 """
+        log.info("redirect command: ${command}")
+
+        def process = command.execute()
+        def code = process.waitFor()
+        def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+        def out = process.getText()
+        logger.info("code=" + code + ", out=" + out + ", err=" + err)
+        assertEquals(code, 0)
+        def json = parseJson(out)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(2, json.NumberTotalRows)
+        assertEquals(2, json.NumberLoadedRows)
+        assertEquals(0, json.NumberFilteredRows)
+        assertEquals(0, json.NumberUnselectedRows)
+        assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+    }
+
+    def getRowCount = { expectedRowCount ->
+        def retry = 0
+        while (retry < 30) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${tableName}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            if (rowCount[0][0] >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+    }
+
+    // cloud mode
+    def options = new ClusterOptions()
+    options.feNum = 1
+    options.beNum = 3
+    options.cloudMode = true
+    options.beConfigs.add('enable_java_support=false')
+    options.feConfigs.add('enable_group_commit_streamload_be_forward=true')
+    docker(options) {
+        // get fe and be info
+        def feIp = cluster.getMasterFe().getHttpAddress()[0]
+        def fePort = cluster.getMasterFe().getHttpAddress()[1]
+        
+        def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+        def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+        def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+        
+        def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+        def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+        def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+        
+        def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+        def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+        def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+        
+        def msIp = cluster.getMetaservices().get(0).getHttpAddress()[0]
+        def msPort = cluster.getMetaservices().get(0).getHttpAddress()[1]
+        
+        sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """
+        sql """
+             CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} (
+                 `id` int(11) NOT NULL,
+                 `name` varchar(1100) NULL,
+                 `score` int(11) NULL default "-1"
+             ) ENGINE=OLAP
+             DUPLICATE KEY(`id`, `name`)
+             DISTRIBUTED BY HASH(`id`) BUCKETS 1
+             PROPERTIES (
+                 "replication_num" = "1",
+                 "group_commit_interval_ms" = "200"
+             );
+        """
+        // test be forward stream load
+        groupCommitRedicetSteamLoad(be1Ip, be1HttpPort, be1Ip, be1HttpPort)
+        groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort)
+        groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort)
+
+        getRowCount(6)
+        qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+
+        // test fe redirect policy
+        log.info("Initial cluster setup - 3 BEs")
+        log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2: 
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+        sql """show backends"""
+        
+        log.info("Dropping all BE")
+        sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+        sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+        sql """ALTER SYSTEM DROPP BACKEND '${be3Ip}:${be3HeartbeatPort}'"""
+        log.info("after dropping all BE: ${sql """show backends""" }")
+        
+        log.info("Adding BE1 BE2 BE3 back with different custom endpoints")
+        sql """ALTER SYSTEM ADD BACKEND 
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
 properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' 
= '11.20.20.19:8040', "tag.compute_group_name" = "compute_cluster")"""
+
+        sleep(30000) 
+        
+        log.info("Final backends configuration: ${sql """show backends""" }")
+        
+        // Test redirect locations - should use one of the available BEs
+        def location = getRedirectLocation(feIp, fePort, "public", 
"async_mode")
+        log.info("public location: ${location}")
+        // redirect url: 
http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port
+        
assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load_forward?")
 && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || 
location.contains("forward_to=${be2Ip}:${be2HttpPort}") || 
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+        location = getRedirectLocation(feIp, fePort, "private", "sync_mode")
+        log.info("private location: ${location}")
+        
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?")
 && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || 
location.contains("forward_to=${be2Ip}:${be2HttpPort}") || 
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+        location = getRedirectLocation(feIp, fePort, "", "async_mode")
+        log.info("default location: ${location}")
+        
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?")
 && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || 
location.contains("forward_to=${be2Ip}:${be2HttpPort}") || 
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+        location = getRedirectLocation(feIp, fePort, "public", "off_mode")
+        log.info("public location: ${location}")
+        
assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load"))
+
+        location = getRedirectLocation(feIp, fePort, "private", "off_mode")
+        log.info("public location: ${location}")
+        
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load"))
+
+        location = getRedirectLocation(feIp, fePort, "", "off_mode")
+        log.info("public location: ${location}")
+        
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load"))
+
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to