This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4379c4ed217 branch-3.1: [feat](cloud) Support cloud group commit
stream load BE forward mode #55326 (#55527)
4379c4ed217 is described below
commit 4379c4ed21759165edc46b1680057a83356bfb23
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 4 10:30:26 2025 +0800
branch-3.1: [feat](cloud) Support cloud group commit stream load BE forward
mode #55326 (#55527)
Cherry-picked from #55326
Co-authored-by: Xin Liao <[email protected]>
---
be/src/http/action/stream_load.cpp | 3 +-
be/src/http/action/stream_load_forward_handler.cpp | 397 +++++++++++++++++++++
be/src/http/action/stream_load_forward_handler.h | 136 +++++++
be/src/http/http_request.cpp | 8 +
be/src/http/http_request.h | 2 +
be/src/service/http_service.cpp | 6 +
.../main/java/org/apache/doris/common/Config.java | 8 +
.../org/apache/doris/httpv2/rest/LoadAction.java | 183 +++++++++-
.../stream_load/test_group_commit_redirect.out | Bin 0 -> 149 bytes
.../stream_load/test_group_commit_redirect.groovy | 185 ++++++++++
10 files changed, 918 insertions(+), 10 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index cde4876388d..56412863b38 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -215,7 +215,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
LOG(INFO) << "new income streaming load request." << ctx->brief() << ",
db=" << ctx->db
- << ", tbl=" << ctx->table << ", group_commit=" <<
ctx->group_commit;
+ << ", tbl=" << ctx->table << ", group_commit=" <<
ctx->group_commit
+ << ", HTTP headers=" << req->get_all_headers();
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
if (st.ok()) {
diff --git a/be/src/http/action/stream_load_forward_handler.cpp
b/be/src/http/action/stream_load_forward_handler.cpp
new file mode 100644
index 00000000000..3f1719312fb
--- /dev/null
+++ b/be/src/http/action/stream_load_forward_handler.cpp
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_forward_handler.h"
+
+#include <event2/buffer.h>
+#include <event2/http.h>
+#include <event2/http_struct.h>
+#include <event2/keyvalq_struct.h>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "util/byte_buffer.h"
+
+namespace doris {
+
+int StreamLoadForwardHandler::on_header(HttpRequest* req) {
+ std::ostringstream headers_info;
+ const auto& headers = req->headers();
+ for (const auto& header : headers) {
+ headers_info << header.first << ":" << header.second << " ";
+ }
+
+ std::ostringstream params_info;
+ const auto* params = req->params();
+ for (const auto& param : *params) {
+ params_info << param.first << "=" << param.second << " ";
+ }
+
+ LOG(INFO) << "StreamLoadForward request started - "
+ << "path: " << req->raw_path() << ", remote: " <<
req->remote_host() << ", headers: ["
+ << headers_info.str() << "]"
+ << ", params: [" << params_info.str() << "]";
+
+ std::shared_ptr<StreamLoadForwardContext> ctx(new
StreamLoadForwardContext());
+ req->set_handler_ctx(ctx);
+
+ auto it = params->find("forward_to");
+ if (it == params->end()) {
+ LOG(WARNING) << "StreamLoadForward failed - missing forward_to
parameter, path: "
+ << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
+ "Missing required parameter 'forward_to'. "
+ "Usage: ?forward_to=host:port");
+ return HttpStatus::BAD_REQUEST;
+ }
+
+ std::string target_host;
+ int target_port;
+ Status st = parse_forward_target(it->second, target_host, target_port);
+ if (!st.ok()) {
+ LOG(WARNING) << "StreamLoadForward failed - invalid forward target: "
<< st.to_string()
+ << ", path: " << req->raw_path();
+ HttpChannel::send_reply(
+ req, HttpStatus::BAD_REQUEST,
+ "Invalid forward_to parameter: " + st.to_string() + ".
Expected format: host:port");
+ return HttpStatus::BAD_REQUEST;
+ }
+
+ ctx->target_host = target_host;
+ ctx->target_port = target_port;
+ ctx->original_req = req;
+
+ Status init_st = init_forward_request(req, target_host, target_port, ctx);
+ if (!init_st.ok()) {
+ LOG(WARNING) << "StreamLoadForward failed - failed to initialize
forward request: "
+ << init_st.to_string() << ", target: " << target_host <<
":" << target_port
+ << ", path: " << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Failed to initialize forward request: " +
init_st.to_string());
+ return HttpStatus::INTERNAL_SERVER_ERROR;
+ }
+
+ return HttpStatus::OK;
+}
+
+void StreamLoadForwardHandler::handle(HttpRequest* req) {
+ auto ctx =
std::static_pointer_cast<StreamLoadForwardContext>(req->handler_ctx());
+ if (!ctx) {
+ LOG(WARNING) << "StreamLoadForward failed - context not found, path: "
<< req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Internal error: context not found");
+ return;
+ }
+
+ auto* forward_req = ctx->get_forward_request();
+ if (!forward_req) {
+ LOG(WARNING) << "Forward request not ready, path: " << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Internal error: forward request not
initialized");
+ return;
+ }
+
+ setup_forward_headers(req, forward_req, ctx->target_host,
ctx->target_port);
+
+ if (!ctx->request_data_chunks.empty()) {
+ evbuffer* output = evhttp_request_get_output_buffer(forward_req);
+ while (!ctx->request_data_chunks.empty()) {
+ const auto& bb = ctx->request_data_chunks.front();
+ if (evbuffer_add(output, bb->ptr, bb->limit) != 0) {
+ LOG(WARNING) << "Failed to add buffered data to output buffer,
chunk size: "
+ << bb->limit << ", total size: " <<
ctx->total_request_size
+ << ", path: " << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Failed to prepare forward data");
+ return;
+ }
+ ctx->request_data_chunks.pop_front();
+ }
+ }
+
+ if (evhttp_make_request(ctx->conn, forward_req, EVHTTP_REQ_PUT,
+ build_forward_url(req).c_str()) != 0) {
+ LOG(WARNING) << "Failed to make forward request to " <<
ctx->target_host << ":"
+ << ctx->target_port << ", path: " << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Failed to forward request to target server: "
+ ctx->target_host +
+ ":" +
std::to_string(ctx->target_port));
+ return;
+ }
+
+ LOG(INFO) << "StreamLoadForward request sent - data size: " <<
ctx->total_request_size
+ << ", target: " << ctx->target_host << ":" << ctx->target_port
+ << ", path: " << req->raw_path();
+}
+
+void StreamLoadForwardHandler::on_chunk_data(HttpRequest* req) {
+ auto ctx =
std::static_pointer_cast<StreamLoadForwardContext>(req->handler_ctx());
+ if (!ctx) {
+ LOG(WARNING) << "No context found for chunk data";
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Internal error: context not found");
+ return;
+ }
+
+ evbuffer* input =
evhttp_request_get_input_buffer(req->get_evhttp_request());
+ while (evbuffer_get_length(input) > 0) {
+ size_t remaining_length = evbuffer_get_length(input);
+ ByteBufferPtr bb;
+ Status st = ByteBuffer::allocate(remaining_length, &bb);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to allocate ByteBuffer: " << st.to_string()
+ << ", path: " << req->raw_path();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Failed to allocate memory for request
data");
+ return;
+ }
+
+ auto remove_bytes = evbuffer_remove(input, bb->ptr, bb->capacity);
+ bb->pos = remove_bytes;
+ bb->flip();
+
+ ctx->request_data_chunks.emplace_back(bb);
+ ctx->total_request_size += remove_bytes;
+ }
+}
+
+Status StreamLoadForwardHandler::init_forward_request(
+ HttpRequest* req, const std::string& target_host, int target_port,
+ std::shared_ptr<StreamLoadForwardContext>& ctx) {
+ ctx->original_req = req;
+
+ struct event_base* ev_base =
+
evhttp_connection_get_base(evhttp_request_get_connection(req->get_evhttp_request()));
+
+ struct evhttp_connection* conn = evhttp_connection_base_new(ev_base,
+ nullptr, //
dns base
+
target_host.c_str(), target_port);
+
+ if (!conn) {
+ return Status::InternalError("Failed to create connection to target
server");
+ }
+
+ evhttp_connection_set_closecb(conn, forward_connection_close_cb,
ctx.get());
+ ctx->conn = conn;
+
+ struct evhttp_request* forward_req =
evhttp_request_new(forward_request_done, ctx.get());
+ if (!forward_req) {
+ evhttp_connection_free(conn);
+ return Status::InternalError("Failed to create forward request");
+ }
+
+ evhttp_request_set_chunked_cb(forward_req, forward_request_chunked_cb);
+
+ ctx->set_forward_request(forward_req);
+ return Status::OK();
+}
+
+void StreamLoadForwardHandler::forward_request_done(struct evhttp_request*
req, void* arg) {
+ auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+
+ if (!req) {
+ LOG(ERROR) << "Forward request failed - no response";
+ evhttp_send_error(ctx->original_req->get_evhttp_request(), 503,
+ "Backend server unavailable");
+ return;
+ }
+
+ int response_code = evhttp_request_get_response_code(req);
+ const char* response_reason = evhttp_request_get_response_code_line(req);
+
+ LOG(INFO) << "StreamLoadForward completed - "
+ << "status: " << response_code
+ << ", reason: " << (response_reason ? response_reason :
"Unknown")
+ << ", response_size: " << ctx->response_data.size() << " bytes"
+ << ", path: " << ctx->original_req->raw_path();
+
+ send_complete_response(req, ctx, response_code);
+}
+
+void StreamLoadForwardHandler::forward_request_chunked_cb(struct
evhttp_request* req, void* arg) {
+ auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+ struct evbuffer* input_buffer = evhttp_request_get_input_buffer(req);
+ if (input_buffer) {
+ size_t data_len = evbuffer_get_length(input_buffer);
+ if (data_len > 0) {
+ // Read all available data and append to our response buffer
+ char* data = (char*)evbuffer_pullup(input_buffer, data_len);
+ if (data) {
+ ctx->response_data.append(data, data_len);
+
+ // Remove the data from the buffer since we've copied it
+ evbuffer_drain(input_buffer, data_len);
+ } else {
+ LOG(WARNING) << "Failed to pullup " << data_len << " bytes
from input buffer";
+ }
+ }
+ }
+}
+
+void StreamLoadForwardHandler::send_complete_response(struct evhttp_request*
req,
+
StreamLoadForwardContext* ctx,
+ int response_code) {
+ struct evbuffer* response_body = evbuffer_new();
+ if (!response_body) {
+ LOG(ERROR) << "Failed to create response buffer";
+ HttpChannel::send_reply(ctx->original_req,
HttpStatus::INTERNAL_SERVER_ERROR,
+ "Internal error: failed to create response
buffer");
+ return;
+ }
+
+ size_t body_len = ctx->response_data.size();
+ if (body_len > 0) {
+ evbuffer_add(response_body, ctx->response_data.c_str(), body_len);
+ }
+
+ struct evkeyvalq* input_headers = evhttp_request_get_input_headers(req);
+ struct evkeyvalq* output_headers =
+
evhttp_request_get_output_headers(ctx->original_req->get_evhttp_request());
+
+ size_t final_body_len = evbuffer_get_length(response_body);
+ evhttp_add_header(output_headers, "Content-Length",
std::to_string(final_body_len).c_str());
+
+ copy_response_headers(input_headers, output_headers);
+
+ evhttp_send_reply(ctx->original_req->get_evhttp_request(), response_code,
+ evhttp_request_get_response_code_line(req),
response_body);
+
+ evbuffer_free(response_body);
+}
+
+void StreamLoadForwardHandler::copy_response_headers(struct evkeyvalq*
input_headers,
+ struct evkeyvalq*
output_headers) {
+ // Copy headers from upstream, excluding specific ones we manage ourselves
+ for (struct evkeyval* header = input_headers->tqh_first; header != nullptr;
+ header = header->next.tqe_next) {
+ if (strcasecmp(header->key, "Transfer-Encoding") == 0 ||
+ strcasecmp(header->key, "Content-Length") == 0 ||
+ strcasecmp(header->key, "Date") == 0 || strcasecmp(header->key,
"Server") == 0 ||
+ strcasecmp(header->key, "Content-Type") == 0) {
+ continue;
+ }
+ const char* value = header->value ? header->value : "";
+ evhttp_add_header(output_headers, header->key, value);
+ }
+}
+
+void StreamLoadForwardHandler::forward_connection_close_cb(struct
evhttp_connection* conn,
+ void* arg) {
+ auto* ctx = static_cast<StreamLoadForwardContext*>(arg);
+ if (!ctx) {
+ LOG(WARNING) << "Context is null in connection close callback";
+ return;
+ }
+
+ ctx->handle_connection_close();
+}
+
+Status StreamLoadForwardHandler::parse_forward_target(const std::string&
forward_to,
+ std::string& host, int&
port) {
+ size_t pos = forward_to.find(':');
+ if (pos == std::string::npos) {
+ return Status::InvalidArgument("Invalid forward_to format, should be
host:port, got: {}",
+ forward_to);
+ }
+
+ host = forward_to.substr(0, pos);
+ std::string port_str = forward_to.substr(pos + 1);
+
+ try {
+ port = std::stoi(port_str);
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Exception while parsing port: " << port_str << ",
what(): " << e.what();
+ return Status::InvalidArgument("Invalid port number in forward_to: {},
exception: {}",
+ port_str, e.what());
+ }
+
+ if (port <= 0 || port > 65535) {
+ return Status::InvalidArgument("Port number must be between 1 and
65535, got: {}", port);
+ }
+
+ return Status::OK();
+}
+
+std::string StreamLoadForwardHandler::build_forward_url(HttpRequest* req) {
+ std::string url;
+ const std::string& raw_path = req->raw_path();
+
+ // Parse /api/{db}/{table}/ part
+ size_t pos = raw_path.find("/_stream_load_forward");
+ if (pos != std::string::npos) {
+ // Keep path prefix, replace _stream_load_forward with _stream_load
+ url = raw_path.substr(0, pos) + "/_stream_load";
+ } else {
+ // If not found, use original path
+ url = raw_path;
+ }
+
+ // Remove forward_to parameter, keep other parameters
+ const auto& params = req->params();
+ std::vector<std::string> query_parts;
+ for (const auto& param : *params) {
+ if (param.first != "forward_to") {
+ query_parts.push_back(param.first + "=" + param.second);
+ }
+ }
+
+ if (!query_parts.empty()) {
+ std::ostringstream oss;
+ for (size_t i = 0; i < query_parts.size(); ++i) {
+ if (i != 0) {
+ oss << "&";
+ }
+ oss << query_parts[i];
+ }
+ url += "?" + oss.str();
+ }
+
+ return url;
+}
+
+void StreamLoadForwardHandler::setup_forward_headers(HttpRequest* req,
+ struct evhttp_request*
forward_req,
+ const std::string&
target_host,
+ int target_port) {
+ struct evkeyvalq* input_headers =
evhttp_request_get_input_headers(req->get_evhttp_request());
+ struct evkeyvalq* output_headers =
evhttp_request_get_output_headers(forward_req);
+
+ // Copy all headers from original request, except Host, Transfer-Encoding,
and Content-Length
+ for (struct evkeyval* header = input_headers->tqh_first; header != nullptr;
+ header = header->next.tqe_next) {
+ // Skip headers that conflict with libevent's automatic handling
+ if (strcasecmp(header->key, "Host") == 0 ||
+ strcasecmp(header->key, "Transfer-Encoding") == 0 ||
+ strcasecmp(header->key, "Content-Length") == 0) {
+ continue;
+ }
+ evhttp_add_header(output_headers, header->key, header->value);
+ }
+
+ // Set new Host header
+ evhttp_add_header(output_headers, "Host",
+ fmt::format("{}:{}", target_host, target_port).c_str());
+ // Add forwarding related headers
+ evhttp_add_header(output_headers, "X-Forwarded-For", req->remote_host());
+ evhttp_add_header(output_headers, "X-Forwarded-Proto", "http");
+ evhttp_add_header(output_headers, "X-Forwarded-Host",
+ evhttp_request_get_host(req->get_evhttp_request()));
+}
+
+} // namespace doris
diff --git a/be/src/http/action/stream_load_forward_handler.h
b/be/src/http/action/stream_load_forward_handler.h
new file mode 100644
index 00000000000..c5c7c674ddc
--- /dev/null
+++ b/be/src/http/action/stream_load_forward_handler.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <event2/http.h>
+
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "common/status.h"
+#include "http/http_handler.h"
+#include "http/http_request.h"
+#include "util/byte_buffer.h"
+
+namespace doris {
+
+// Context for storing stream load forward request information
+class StreamLoadForwardContext {
+public:
+ StreamLoadForwardContext() = default;
+ ~StreamLoadForwardContext() {
+ struct evhttp_connection* conn_to_free = nullptr;
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ // forward request will be released by libevent automatically
+ forward_req = nullptr;
+ if (conn && !connection_closed) {
+ // Only free connection if it wasn't already released by
libevent
+ conn_to_free = conn;
+ conn = nullptr;
+ }
+ }
+ // Free connection outside of mutex to avoid deadlock with connection
close callback
+ if (conn_to_free) {
+ evhttp_connection_free(conn_to_free);
+ }
+ }
+
+ void set_forward_request(evhttp_request* req) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ forward_req = req;
+ }
+
+ evhttp_request* get_forward_request() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return forward_req;
+ }
+
+ void handle_connection_close() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ connection_closed = true;
+ // Connection has been released by libevent automatically, set pointer
to null
+ // to prevent double-free in destructor
+ conn = nullptr;
+ }
+
+ bool is_connection_closed() const {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return connection_closed;
+ }
+
+ struct evhttp_connection* conn {nullptr};
+ // Original request reference, lifecycle managed by HTTP framework
+ HttpRequest* original_req {nullptr};
+
+ std::string target_host;
+ int target_port {0};
+
+ // Buffer for collecting response data
+ std::string response_data;
+
+ std::deque<ByteBufferPtr> request_data_chunks;
+ size_t total_request_size = 0;
+
+private:
+ mutable std::mutex _mutex;
+ struct evhttp_request* forward_req {nullptr};
+ bool connection_closed {false};
+};
+
+// Stream Load request forward handler
+// Forwards Stream Load requests to other BE nodes
+// Supports streaming forward, maintains original request path format:
/api/{db}/{table}/_stream_load_forward
+class StreamLoadForwardHandler : public HttpHandler {
+public:
+ StreamLoadForwardHandler() = default;
+ ~StreamLoadForwardHandler() override = default;
+
+ void handle(HttpRequest* req) override;
+
+ bool request_will_be_read_progressively() override { return true; }
+
+ int on_header(HttpRequest* req) override;
+
+ void on_chunk_data(HttpRequest* req) override;
+
+private:
+ Status init_forward_request(HttpRequest* req, const std::string&
target_host, int target_port,
+ std::shared_ptr<StreamLoadForwardContext>&
ctx);
+
+ static void forward_request_done(struct evhttp_request* req, void* arg);
+ static void forward_request_chunked_cb(struct evhttp_request* req, void*
arg);
+ static void forward_connection_close_cb(struct evhttp_connection* conn,
void* arg);
+
+ // Response helper functions
+ static void send_complete_response(struct evhttp_request* req,
StreamLoadForwardContext* ctx,
+ int response_code);
+ static void copy_response_headers(struct evkeyvalq* input_headers,
+ struct evkeyvalq* output_headers);
+
+ Status parse_forward_target(const std::string& forward_to, std::string&
host, int& port);
+
+ std::string build_forward_url(HttpRequest* req);
+
+ void setup_forward_headers(HttpRequest* req, struct evhttp_request*
forward_req,
+ const std::string& target_host, int
target_port);
+};
+
+} // namespace doris
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 14bde591b4c..f7432978016 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -110,6 +110,14 @@ const std::string& HttpRequest::param(const std::string&
key) const {
return iter->second;
}
+std::string HttpRequest::get_all_headers() const {
+ std::stringstream headers;
+ for (const auto& header : _headers) {
+ headers << header.first << ":" << header.second + ", ";
+ }
+ return headers.str();
+}
+
void HttpRequest::add_output_header(const char* key, const char* value) {
evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
}
diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h
index a9286410aff..41d8cf98baa 100644
--- a/be/src/http/http_request.h
+++ b/be/src/http/http_request.h
@@ -55,6 +55,8 @@ public:
// return params
const StringCaseUnorderedMap<std::string>& headers() { return _headers; }
+ std::string get_all_headers() const;
+
// return params
std::map<std::string, std::string>* params() { return &_params; }
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 32ea86b09b1..6c604dd09d0 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -65,6 +65,7 @@
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/stream_load_2pc.h"
+#include "http/action/stream_load_forward_handler.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablets_info_action.h"
@@ -133,6 +134,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::PUT,
"/api/{db}/{table}/_stream_load_2pc",
streamload_2pc_action);
+ // register stream load forward handler
+ auto* forward_handler = _pool.add(new StreamLoadForwardHandler());
+ _ev_http_server->register_handler(HttpMethod::PUT,
"/api/{db}/{table}/_stream_load_forward",
+ forward_handler);
+
// register http_stream
HttpStreamAction* http_stream_action = _pool.add(new
HttpStreamAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/_http_stream",
http_stream_action);
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 26fe184f6ff..95b93afb8f5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3260,6 +3260,14 @@ public class Config extends ConfigBase {
+ "public-private/public/private/direct/random-be and empty
string" })
public static String streamload_redirect_policy = "";
+ @ConfField(mutable = true, description = {
+ "存算分离模式下是否启用group commit的streamload BE转发功能。"
+ + "解决LB随机转发导致group commit攒批失效的问题,通过BE二次转发确保同表请求到达同一BE节点。",
+ "Whether to enable group commit streamload BE forward feature in
cloud mode. "
+ + "Solves the issue where LB random forwarding breaks
group commit batching "
+ + "by implementing BE-level forwarding to ensure
same-table requests reach the same BE node." })
+ public static boolean enable_group_commit_streamload_be_forward = false;
+
@ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
"create table in cloud mode, check recycler key remained, default
true"})
public static boolean check_create_table_recycle_key_remained = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 065c93bcc76..b79b67fedcd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -324,7 +325,8 @@ public class LoadAction extends RestBaseController {
tableId = ((OlapTable) olapTable.get()).getId();
}
- redirectAddr = selectRedirectBackend(request, groupCommit,
tableId);
+ // Handle stream load with potential group commit forwarding
+ redirectAddr = handleStreamLoadRedirect(request, groupCommit,
tableId, dbName, tableName, label);
}
if (LOG.isDebugEnabled()) {
@@ -334,6 +336,9 @@ public class LoadAction extends RestBaseController {
RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
+ } catch (StreamLoadForwardException e) {
+ // Special handling for stream load forwarding
+ return e.getRedirectView();
} catch (Exception e) {
LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {},
err: {}",
isStreamLoad, db, table, label, e.getMessage());
@@ -398,6 +403,11 @@ public class LoadAction extends RestBaseController {
private TNetworkAddress selectRedirectBackend(HttpServletRequest request,
boolean groupCommit, long tableId)
throws LoadException {
+ return selectRedirectBackend(request, groupCommit, tableId, null);
+ }
+
+ private TNetworkAddress selectRedirectBackend(HttpServletRequest request,
boolean groupCommit, long tableId,
+ Backend preSelectedBackend) throws LoadException {
long debugBackendId =
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
-1L);
if (debugBackendId != -1L) {
Backend backend =
Env.getCurrentSystemInfo().getBackend(debugBackendId);
@@ -408,17 +418,17 @@ public class LoadAction extends RestBaseController {
if (Strings.isNullOrEmpty(cloudClusterName)) {
throw new LoadException("No cloud cluster name selected.");
}
- return selectCloudRedirectBackend(cloudClusterName, request,
groupCommit, tableId);
+ return selectCloudRedirectBackend(cloudClusterName, request,
groupCommit, tableId, preSelectedBackend);
} else {
if (groupCommit && tableId == -1) {
throw new LoadException("Group commit table id wrong.");
}
- return selectLocalRedirectBackend(groupCommit, request, tableId);
+ return selectLocalRedirectBackend(groupCommit, request, tableId,
preSelectedBackend);
}
}
- private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit,
HttpServletRequest request, long tableId)
- throws LoadException {
+ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit,
HttpServletRequest request, long tableId,
+ Backend preSelectedBackend) throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -438,7 +448,9 @@ public class LoadAction extends RestBaseController {
throw new
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
if (groupCommit) {
- backend = selectBackendForGroupCommit("", request, tableId);
+ // Use pre-selected backend if provided to avoid duplicate calls
+ backend = preSelectedBackend != null ? preSelectedBackend
+ : selectBackendForGroupCommit("", request, tableId);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
@@ -449,11 +461,12 @@ public class LoadAction extends RestBaseController {
}
private TNetworkAddress selectCloudRedirectBackend(String clusterName,
HttpServletRequest req, boolean groupCommit,
- long tableId)
- throws LoadException {
+ long tableId, Backend preSelectedBackend) throws LoadException {
Backend backend = null;
if (groupCommit) {
- backend = selectBackendForGroupCommit(clusterName, req, tableId);
+ // Use pre-selected backend if provided to avoid duplicate calls
+ backend = preSelectedBackend != null ? preSelectedBackend
+ : selectBackendForGroupCommit(clusterName, req, tableId);
} else {
backend = StreamLoadHandler.selectBackend(clusterName);
}
@@ -917,4 +930,156 @@ public class LoadAction extends RestBaseController {
}
+ /*
+ * Create redirect URL for stream load forward mode.
+ *
+ * This method constructs the special redirect URL used in the group
commit forwarding mechanism:
+ *
+ * Key modifications to the standard redirect:
+ * 1. Path modification: Changes "/_stream_load" to "/_stream_load_forward"
+ * - This tells the receiving BE that it needs to perform additional
forwarding
+ * - The "_stream_load_forward" endpoint is specifically designed to
handle forwarding logic
+ *
+ * 2. Forward target parameter: Adds "forward_to=host:port" to the query
string
+ * - Specifies the actual target BE node that should process this
request
+ * - Ensures all requests for the same table reach the same BE for
optimal batching
+ *
+ * 3. Authentication preservation: Maintains user authentication in the
URL if present
+ * - Ensures the forwarded request has proper authentication context
+ *
+ * Example transformation:
+ * Original: http://endpoint:port/api/db/table/_stream_load?param=value
+ * Forward:
http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port
+ *
+ * @param request the original HTTP request
+ * @param addr the endpoint address to redirect to (public/private
endpoint)
+ * @param forwardTarget the target BE node in "host:port" format for final
processing
+ * @return RedirectView configured for stream load forwarding
+ */
+ private RedirectView redirectToStreamLoadForward(HttpServletRequest
request, TNetworkAddress addr,
+ String forwardTarget) {
+ URI urlObj = null;
+ URI resultUriObj = null;
+ String urlStr = request.getRequestURI();
+ String userInfo = null;
+ String modifiedPath = null;
+
+ if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
+ ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
+ userInfo =
ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
+ + ":" + authInfo.password;
+ }
+ try {
+ urlObj = new URI(urlStr);
+ // Replace _stream_load with _stream_load_forward in the path
+ modifiedPath = urlObj.getPath().replace("/_stream_load",
"/_stream_load_forward");
+ resultUriObj = new URI("http", userInfo, addr.getHostname(),
+ addr.getPort(), modifiedPath, "", null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ String redirectUrl = resultUriObj.toASCIIString();
+
+ // Add forward_to parameter (note: toASCIIString() already includes
'?' due to empty query)
+ String queryString = request.getQueryString();
+ if (!Strings.isNullOrEmpty(queryString)) {
+ redirectUrl += queryString + "&forward_to=" + forwardTarget;
+ } else {
+ redirectUrl += "forward_to=" + forwardTarget;
+ }
+
+ LOG.info("Redirect stream load forward url: {}, forward_to: {}",
+ "http://" + addr.getHostname() + ":" + addr.getPort() +
modifiedPath, forwardTarget);
+ RedirectView redirectView = new RedirectView(redirectUrl);
+ redirectView.setContentType("text/html;charset=utf-8");
+
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+ return redirectView;
+ }
+
+ /**
+ * Handle stream load redirect with optional group commit forwarding.
+ *
+ * Group Commit Stream Load Forward Mode in Cloud Environment:
+ *
+ * Problem:
+ * Group commit requires that requests for the same table be sent to the
same BE node
+ * to achieve better batching efficiency. However, in cloud mode with Load
Balancer (LB),
+ * the LB randomly selects a BE node for forwarding, which breaks the
group commit strategy
+ * and reduces batching effectiveness.
+ *
+ * Solution:
+ * Implement a two-stage forwarding mechanism:
+ * 1. FE redirects to public/private endpoint (LB) as usual
+ * 2. BE performs a second forwarding to the actual target BE node that
handles the specific table
+ *
+ * This ensures that all requests for the same table ultimately reach the
same BE node,
+ * preserving the group commit batching strategy while still utilizing the
LB infrastructure.
+ *
+ * @param request the HTTP request
+ * @param groupCommit whether group commit is enabled
+ * @param tableId the table ID for group commit
+ * @param dbName database name for logging
+ * @param tableName table name for logging
+ * @param label label for logging
+ * @return redirect address for normal redirect
+ * @throws StreamLoadForwardException if forward redirect is applied
+ * @throws LoadException if redirect selection fails
+ */
+ private TNetworkAddress handleStreamLoadRedirect(HttpServletRequest
request, boolean groupCommit,
+ long tableId, String dbName, String tableName, String label)
throws LoadException {
+
+ // Check if group commit forwarding is needed
+ if (!Config.isCloudMode() || !groupCommit ||
!Config.enable_group_commit_streamload_be_forward) {
+ return selectRedirectBackend(request, groupCommit, tableId);
+ }
+
+ String cloudClusterName = getCloudClusterName(request);
+ if (Strings.isNullOrEmpty(cloudClusterName)) {
+ throw new LoadException("No cloud cluster name selected for group
commit forwarding.");
+ }
+
+ // Get target backend for group commit
+ Backend targetBackend = selectBackendForGroupCommit(cloudClusterName,
request, tableId);
+ if (targetBackend == null) {
+ throw new LoadException("Failed to select target backend for group
commit forwarding.");
+ }
+
+ // Get redirect address with optimized backend selection
+ TNetworkAddress redirectAddr =
selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId,
+ targetBackend);
+ TNetworkAddress targetAddr = new
TNetworkAddress(targetBackend.getHost(), targetBackend.getHttpPort());
+
+ // Apply forwarding if addresses differ (compare hostname and port
directly)
+ if (!redirectAddr.getHostname().equals(targetAddr.getHostname())
+ || redirectAddr.getPort() != targetAddr.getPort()) {
+ // Apply stream load forwarding by throwing
StreamLoadForwardException with RedirectView
+ String forwardTarget = targetAddr.getHostname() + ":" +
targetAddr.getPort();
+ RedirectView forwardRedirectView =
redirectToStreamLoadForward(request, redirectAddr, forwardTarget);
+
+ LOG.info("Using stream load forward mode for cloud group commit - "
+ + "db: {}, tbl: {}, label: {}, endpoint: {}, forward_to:
{}, reason: redirect_differs_from_target",
+ dbName, tableName, label, redirectAddr.toString(),
forwardTarget);
+
+ throw new StreamLoadForwardException(forwardRedirectView);
+ } else {
+ LOG.debug("Skip stream load forward - redirect address matches
target backend: {}",
+ redirectAddr.toString());
+ return redirectAddr;
+ }
+ }
+
+ /**
+ * Special exception to carry RedirectView for stream load forwarding.
+ */
+ private static class StreamLoadForwardException extends RuntimeException {
+ private final RedirectView redirectView;
+
+ public StreamLoadForwardException(RedirectView redirectView) {
+ this.redirectView = redirectView;
+ }
+
+ public RedirectView getRedirectView() {
+ return redirectView;
+ }
+ }
}
diff --git
a/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out
b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out
new file mode 100644
index 00000000000..abb03a16633
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out differ
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy
new file mode 100644
index 00000000000..c7b7b6c5a94
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_group_commit_redirect', 'docker') {
+ def databaseName = context.config.getDbNameByFile(context.file)
+ def tableName = "tbl"
+ def getRedirectLocation = { feIp, fePort, redirectPolicy, mode ->
+ def command = """ curl -v --max-redirs 0 --location-trusted -u root:
+ -H redirect-policy:$redirectPolicy
+ -H group_commit:$mode
+ -T
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
+
http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """
+ log.info("redirect command: ${command}")
+
+ def code = -1
+ def location = ""
+ try {
+ def process = command.execute()
+ code = process.waitFor()
+ // Parse Location from stderr since curl -v outputs headers to
stderr
+ def errorOutput = process.err.text
+ def locationLine = errorOutput.readLines().find {
it.trim().startsWith('< Location: ') }
+ if (locationLine) {
+ location = locationLine.trim().substring('< Location:
'.length())
+ }
+ log.info("curl output: ${process.text}")
+ log.info("curl error: ${errorOutput}")
+ } catch (Exception e) {
+ log.info("exception: ${e}".toString())
+ }
+ return location
+ }
+
+ def groupCommitRedicetSteamLoad = { beIp, bePort, targetBe, targetBePort
->
+ def command = """ curl -v --location-trusted -u root:
+ -H group_commit:async_mode
+ -H column_separator:,
+ -H columns:id,name
+ -T
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
+
http://${beIp}:${bePort}/api/${databaseName}/${tableName}/_stream_load_forward?forward_to=${targetBe}:${targetBePort}
"""
+ log.info("redirect command: ${command}")
+
+ def process = command.execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ def out = process.getText()
+ logger.info("code=" + code + ", out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def json = parseJson(out)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(2, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+
+ def getRowCount = { expectedRowCount ->
+ def retry = 0
+ while (retry < 30) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${tableName}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
+
+ // cloud mode
+ def options = new ClusterOptions()
+ options.feNum = 1
+ options.beNum = 3
+ options.cloudMode = true
+ options.beConfigs.add('enable_java_support=false')
+ options.feConfigs.add('enable_group_commit_streamload_be_forward=true')
+ docker(options) {
+ // get fe and be info
+ def feIp = cluster.getMasterFe().getHttpAddress()[0]
+ def fePort = cluster.getMasterFe().getHttpAddress()[1]
+
+ def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+ def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+ def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+
+ def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+ def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+ def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+
+ def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+ def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+ def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+
+ def msIp = cluster.getMetaservices().get(0).getHttpAddress()[0]
+ def msPort = cluster.getMetaservices().get(0).getHttpAddress()[1]
+
+ sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200"
+ );
+ """
+ // test be forward stream load
+ groupCommitRedicetSteamLoad(be1Ip, be1HttpPort, be1Ip, be1HttpPort)
+ groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort)
+ groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort)
+
+ getRowCount(6)
+ qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+
+ // test fe redirect policy
+ log.info("Initial cluster setup - 3 BEs")
+ log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2:
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+ sql """show backends"""
+
+ log.info("Dropping all BE")
+ sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+ sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+ sql """ALTER SYSTEM DROPP BACKEND '${be3Ip}:${be3HeartbeatPort}'"""
+ log.info("after dropping all BE: ${sql """show backends""" }")
+
+ log.info("Adding BE1 BE2 BE3 back with different custom endpoints")
+ sql """ALTER SYSTEM ADD BACKEND
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint'
= '11.20.20.19:8040', "tag.compute_group_name" = "compute_cluster")"""
+
+ sleep(30000)
+
+ log.info("Final backends configuration: ${sql """show backends""" }")
+
+ // Test redirect locations - should use one of the available BEs
+ def location = getRedirectLocation(feIp, fePort, "public",
"async_mode")
+ log.info("public location: ${location}")
+ // redirect url:
http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port
+
assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load_forward?")
&& (location.contains("forward_to=${be1Ip}:${be1HttpPort}") ||
location.contains("forward_to=${be2Ip}:${be2HttpPort}") ||
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+ location = getRedirectLocation(feIp, fePort, "private", "sync_mode")
+ log.info("private location: ${location}")
+
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?")
&& (location.contains("forward_to=${be1Ip}:${be1HttpPort}") ||
location.contains("forward_to=${be2Ip}:${be2HttpPort}") ||
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+ location = getRedirectLocation(feIp, fePort, "", "async_mode")
+ log.info("default location: ${location}")
+
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?")
&& (location.contains("forward_to=${be1Ip}:${be1HttpPort}") ||
location.contains("forward_to=${be2Ip}:${be2HttpPort}") ||
location.contains("forward_to=${be3Ip}:${be3HttpPort}")))
+
+ location = getRedirectLocation(feIp, fePort, "public", "off_mode")
+ log.info("public location: ${location}")
+
assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load"))
+
+ location = getRedirectLocation(feIp, fePort, "private", "off_mode")
+ log.info("public location: ${location}")
+
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load"))
+
+ location = getRedirectLocation(feIp, fePort, "", "off_mode")
+ log.info("public location: ${location}")
+
assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load"))
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]