This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 01a54a0e838 branch-3.1: [feature](stream-load) support stream load 
endpoint redirect policy #53104 (#53208)
01a54a0e838 is described below

commit 01a54a0e838ec5b7522f22406562522d8e62417e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 15 11:49:49 2025 +0800

    branch-3.1: [feature](stream-load) support stream load endpoint redirect 
policy #53104 (#53208)
    
    Cherry-picked from #53104
    
    Co-authored-by: Xin Liao <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   6 +-
 .../doris/cloud/catalog/CloudClusterChecker.java   |  45 +++--
 .../doris/cloud/system/CloudSystemInfoService.java |  72 +++++++-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  66 +++++--
 .../main/java/org/apache/doris/resource/Tag.java   |  14 +-
 .../main/java/org/apache/doris/system/Backend.java |  45 +++--
 gensrc/proto/cloud.proto                           |   2 +
 .../doris/regression/suite/SuiteCluster.groovy     |   4 +
 .../stream_load/test_stream_load_endpoint.groovy   | 198 +++++++++++++++++++++
 9 files changed, 399 insertions(+), 53 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5b0fe49de3d..2d3f8fefd85 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3276,8 +3276,10 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int audit_event_log_queue_size = 250000;
 
-    @ConfField(description = {"存算分离模式下streamload导入使用的转发策略, 
可选值为public-private或者空",
-            "streamload route policy in cloud mode, availale options are 
public-private and empty string"})
+    @ConfField(mutable = true, description = {
+            "streamload导入使用的转发策略, 
可选值为public-private/public/private/direct/random-be/空",
+            "streamload route policy, available options are "
+            + "public-private/public/private/direct/random-be and empty 
string" })
     public static String streamload_redirect_policy = "";
 
     @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index ca81b165cb9..4ac8dc0dc03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -101,8 +101,8 @@ public class CloudClusterChecker extends MasterDaemon {
                 // Attach tag to BEs
                 String clusterName = 
remoteClusterIdToPB.get(addId).getClusterName();
                 String clusterId = 
remoteClusterIdToPB.get(addId).getClusterId();
-                String publicEndpoint = 
remoteClusterIdToPB.get(addId).getPublicEndpoint();
-                String privateEndpoint = 
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
+                String clusterPublicEndpoint = 
remoteClusterIdToPB.get(addId).getPublicEndpoint();
+                String clusterPrivateEndpoint = 
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
                 // For old versions that do no have status field set
                 ClusterStatus clusterStatus = 
remoteClusterIdToPB.get(addId).hasClusterStatus()
                         ? remoteClusterIdToPB.get(addId).getClusterStatus() : 
ClusterStatus.NORMAL;
@@ -118,8 +118,14 @@ public class CloudClusterChecker extends MasterDaemon {
                     newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, 
String.valueOf(clusterStatus));
                     newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
                     newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
-                    newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
publicEndpoint);
-                    newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
privateEndpoint);
+                    String nodePublicEndpoint = node.hasPublicEndpoint() && 
!node.getPublicEndpoint().isEmpty()
+                            ? node.getPublicEndpoint()
+                            : clusterPublicEndpoint;
+                    String nodePrivateEndpoint = node.hasPrivateEndpoint() && 
!node.getPrivateEndpoint().isEmpty()
+                            ? node.getPrivateEndpoint()
+                            : clusterPrivateEndpoint;
+                    newTagMap.put(Tag.PUBLIC_ENDPOINT, nodePublicEndpoint);
+                    newTagMap.put(Tag.PRIVATE_ENDPOINT, nodePrivateEndpoint);
                     newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
                     b.setTagMap(newTagMap);
                     toAdd.add(b);
@@ -200,28 +206,32 @@ public class CloudClusterChecker extends MasterDaemon {
                 // edit log
                 Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
             }
-            updateIfComputeNodeEndpointChanged(remoteClusterPb, be);
+            updateIfComputeNodeEndpointChanged(remoteClusterPb, node, be);
         }
     }
 
-    private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb, 
Backend be) {
+    private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb, 
Cloud.NodeInfoPB node, Backend be) {
         // check PublicEndpoint、PrivateEndpoint is changed?
         boolean netChanged = false;
-        String remotePublicEndpoint = remoteClusterPb.getPublicEndpoint();
-        String localPublicEndpoint = 
be.getTagMap().get(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT);
+        String remotePublicEndpoint = node.hasPublicEndpoint() && 
!node.getPublicEndpoint().isEmpty()
+                ? node.getPublicEndpoint()
+                : remoteClusterPb.getPublicEndpoint();
+        String localPublicEndpoint = be.getTagMap().get(Tag.PUBLIC_ENDPOINT);
         if (!localPublicEndpoint.equals(remotePublicEndpoint)) {
             LOG.info("be {} has changed public_endpoint from {} to {}",
                     be, localPublicEndpoint, remotePublicEndpoint);
-            be.getTagMap().put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remotePublicEndpoint);
+            be.getTagMap().put(Tag.PUBLIC_ENDPOINT, remotePublicEndpoint);
             netChanged = true;
         }
 
-        String remotePrivateEndpoint = remoteClusterPb.getPrivateEndpoint();
-        String localPrivateEndpoint = 
be.getTagMap().get(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT);
+        String remotePrivateEndpoint = node.hasPrivateEndpoint() && 
!node.getPrivateEndpoint().isEmpty()
+                ? node.getPrivateEndpoint()
+                : remoteClusterPb.getPrivateEndpoint();
+        String localPrivateEndpoint = be.getTagMap().get(Tag.PRIVATE_ENDPOINT);
         if (!localPrivateEndpoint.equals(remotePrivateEndpoint)) {
             LOG.info("be {} has changed private_endpoint from {} to {}",
                     be, localPrivateEndpoint, remotePrivateEndpoint);
-            be.getTagMap().put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
remotePrivateEndpoint);
+            be.getTagMap().put(Tag.PRIVATE_ENDPOINT, remotePrivateEndpoint);
             netChanged = true;
         }
         if (netChanged) {
@@ -322,6 +332,12 @@ public class CloudClusterChecker extends MasterDaemon {
                         LOG.warn("cant get valid add from ms {}", node);
                         continue;
                     }
+                    String publicEndpoint = node.hasPublicEndpoint() && 
!node.getPublicEndpoint().isEmpty()
+                            ? node.getPublicEndpoint()
+                            : remoteClusterIdToPB.get(cid).getPublicEndpoint();
+                    String privateEndpoint = node.hasPrivateEndpoint() && 
!node.getPrivateEndpoint().isEmpty()
+                            ? node.getPrivateEndpoint()
+                            : 
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
                     String endpoint = host + ":" + node.getHeartbeatPort();
                     Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
host, node.getHeartbeatPort());
                     if (node.hasIsSmoothUpgrade()) {
@@ -332,9 +348,8 @@ public class CloudClusterChecker extends MasterDaemon {
                     Map<String, String> newTagMap = 
Tag.DEFAULT_BACKEND_TAG.toMap();
                     newTagMap.put(Tag.CLOUD_CLUSTER_NAME, 
remoteClusterIdToPB.get(cid).getClusterName());
                     newTagMap.put(Tag.CLOUD_CLUSTER_ID, 
remoteClusterIdToPB.get(cid).getClusterId());
-                    newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPublicEndpoint());
-                    newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
-                            remoteClusterIdToPB.get(cid).getPrivateEndpoint());
+                    newTagMap.put(Tag.PUBLIC_ENDPOINT, publicEndpoint);
+                    newTagMap.put(Tag.PRIVATE_ENDPOINT, privateEndpoint);
                     newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
                     b.setTagMap(newTagMap);
                     nodeMap.put(endpoint, b);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 61dbcb4d053..81765e18029 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -578,7 +578,12 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     @Override
     public void addBackends(List<HostInfo> hostInfos, Map<String, String> 
tagMap) throws UserException {
         // issue rpc to meta to add this node, then fe master would add this 
node to its backends
-
+        if (Strings.isNullOrEmpty(((CloudEnv) 
Env.getCurrentEnv()).getCloudInstanceId())) {
+            throw new DdlException("unable to add backends due to empty 
cloud_instance_id");
+        }
+        if (hostInfos.isEmpty()) {
+            return;
+        }
         String clusterName = tagMap.getOrDefault(Tag.COMPUTE_GROUP_NAME, 
Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME);
         if (clusterName.isEmpty()) {
             throw new UserException("ComputeGroup'name can not be empty");
@@ -590,7 +595,48 @@ public class CloudSystemInfoService extends 
SystemInfoService {
                 : String.valueOf(Config.cluster_id);
 
         String cloudUniqueId = "1:" + instanceId + ":" + 
RandomIdentifierGenerator.generateRandomIdentifier(8);
-        alterBackendCluster(hostInfos, computeGroupId, cloudUniqueId, 
Cloud.AlterClusterRequest.Operation.ADD_NODE);
+
+        String publicEndpoint = tagMap.getOrDefault(Tag.PUBLIC_ENDPOINT, "");
+        String privateEndpoint = tagMap.getOrDefault(Tag.PRIVATE_ENDPOINT, "");
+
+        Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
+                .setClusterId(computeGroupId)
+                .setType(Cloud.ClusterPB.Type.COMPUTE)
+                .build();
+
+        for (HostInfo hostInfo : hostInfos) {
+            Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder()
+                    .setCloudUniqueId(cloudUniqueId)
+                    .setIp(hostInfo.getHost())
+                    .setHost(hostInfo.getHost())
+                    .setHeartbeatPort(hostInfo.getPort())
+                    .setCtime(System.currentTimeMillis() / 1000)
+                    .setPublicEndpoint(publicEndpoint)
+                    .setPrivateEndpoint(privateEndpoint)
+                    .build();
+            clusterPB = clusterPB.toBuilder().addNodes(nodeInfoPB).build();
+            LOG.info("adding backend node: host={}, port={}, 
publicEndpoint={}, privateEndpoint={}",
+                    hostInfo.getHost(), hostInfo.getPort(), publicEndpoint, 
privateEndpoint);
+        }
+
+        Cloud.AlterClusterRequest request = 
Cloud.AlterClusterRequest.newBuilder()
+                .setInstanceId(((CloudEnv) 
Env.getCurrentEnv()).getCloudInstanceId())
+                .setOp(Cloud.AlterClusterRequest.Operation.ADD_NODE)
+                .setCluster(clusterPB)
+                .build();
+
+        Cloud.AlterClusterResponse response;
+        try {
+            response = MetaServiceProxy.getInstance().alterCluster(request);
+            LOG.info("add backends, request: {}, response: {}", request, 
response);
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                LOG.warn("add backends not ok, response: {}", response);
+                throw new DdlException("failed to add backends errorCode: " + 
response.getStatus().getCode()
+                        + " msg: " + response.getStatus().getMsg());
+            }
+        } catch (RpcException e) {
+            throw new DdlException("failed to add backends", e);
+        }
     }
 
     // final entry of dropping backend
@@ -1040,8 +1086,8 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         String clusterNameMeta = cpb.getClusterName();
         Cloud.ClusterStatus clusterStatus = cpb.hasClusterStatus()
                 ? cpb.getClusterStatus() : Cloud.ClusterStatus.NORMAL;
-        String publicEndpoint = cpb.getPublicEndpoint();
-        String privateEndpoint = cpb.getPrivateEndpoint();
+        String clusterPublicEndpoint = cpb.getPublicEndpoint();
+        String clusterPrivateEndpoint = cpb.getPrivateEndpoint();
         // Prepare backends
         List<Backend> backends = new ArrayList<>();
         for (Cloud.NodeInfoPB node : cpb.getNodesList()) {
@@ -1050,14 +1096,24 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta);
             newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
             newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, 
String.valueOf(clusterStatus));
-            newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint);
-            newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint);
+            // Prioritize node-level endpoint configuration, use cluster-level 
endpoint if
+            // node endpoint is empty
+            String nodePublicEndpoint = node.hasPublicEndpoint() && 
!node.getPublicEndpoint().isEmpty()
+                    ? node.getPublicEndpoint()
+                    : clusterPublicEndpoint;
+            String nodePrivateEndpoint = node.hasPrivateEndpoint() && 
!node.getPrivateEndpoint().isEmpty()
+                    ? node.getPrivateEndpoint()
+                    : clusterPrivateEndpoint;
+
+            newTagMap.put(Tag.PUBLIC_ENDPOINT, nodePublicEndpoint);
+            newTagMap.put(Tag.PRIVATE_ENDPOINT, nodePrivateEndpoint);
             newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
             Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
node.getIp(), node.getHeartbeatPort());
             b.setTagMap(newTagMap);
             backends.add(b);
-            LOG.info("new backend to add, clusterName={} clusterId={} 
backend={}",
-                    clusterNameMeta, clusterId, b.toString());
+            LOG.info(
+                    "new backend to add, clusterName={} clusterId={} 
backend={}, publicEndpoint={}, privateEndpoint={}",
+                    clusterNameMeta, clusterId, b.toString(), 
nodePublicEndpoint, nodePrivateEndpoint);
         }
 
         updateCloudBackends(backends, new ArrayList<>());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 907522946d9..3511d01d210 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -79,6 +79,9 @@ public class LoadAction extends RestBaseController {
 
     public static final String REDIRECT_POLICY_PUBLIC_PRIVATE = 
"public-private";
     public static final String REDIRECT_POLICY_RANDOM_BE = "random-be";
+    public static final String REDIRECT_POLICY_DIRECT = "direct";
+    public static final String REDIRECT_POLICY_PUBLIC = "public";
+    public static final String REDIRECT_POLICY_PRIVATE = "private";
 
     private ExecuteEnv execEnv = ExecuteEnv.getInstance();
 
@@ -435,7 +438,7 @@ public class LoadAction extends RestBaseController {
         if (backend == null) {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
-        return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+        return selectEndpointByRedirectPolicy(request, backend);
     }
 
     private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
HttpServletRequest req, boolean groupCommit,
@@ -447,33 +450,50 @@ public class LoadAction extends RestBaseController {
         } else {
             backend = StreamLoadHandler.selectBackend(clusterName);
         }
+        return selectEndpointByRedirectPolicy(req, backend);
+    }
 
-        String redirectPolicy = 
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
-        // User specified redirect policy
-        if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
-            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
-        }
-        redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
-                ? Config.streamload_redirect_policy : redirectPolicy;
-
+    /**
+     * Selects the endpoint address based on the redirect policy specified in 
the request header.
+     * The available redirect policies are:
+     * - DIRECT: Redirects to the backend's host.
+     * - PUBLIC: Redirects to the public endpoint of the backend.
+     * - PRIVATE: Redirects to the private endpoint of the backend.
+     * - PUBLIC_PRIVATE: Redirects based on the host IP or domain. If the  
host is a site-local
+     *     address, redirects to the private endpoint. Otherwise, redirects to 
the public endpoint.
+     * - DEFAULT: If request host equals to backend's public endpoint, 
redirects to the public endpoint.
+     *     If private endpoint of backend is set, redirects to the private 
endpoint. Otherwise, redirects
+     *     to the backend's host.
+     *
+     * @param req The HTTP request object.
+     * @param backend The backend to redirect to.
+     * @return The selected endpoint address.
+     * @throws LoadException If there is an error in the redirect policy or 
endpoint selection.
+     */
+    private TNetworkAddress selectEndpointByRedirectPolicy(HttpServletRequest 
req, Backend backend)
+            throws LoadException {
         Pair<String, Integer> publicHostPort = null;
         Pair<String, Integer> privateHostPort = null;
         try {
-            if (!Strings.isNullOrEmpty(backend.getCloudPublicEndpoint())) {
-                publicHostPort = 
splitHostAndPort(backend.getCloudPublicEndpoint());
+            if (!Strings.isNullOrEmpty(backend.getPublicEndpoint())) {
+                publicHostPort = splitHostAndPort(backend.getPublicEndpoint());
             }
         } catch (AnalysisException e) {
             throw new LoadException(e.getMessage());
         }
 
         try {
-            if (!Strings.isNullOrEmpty(backend.getCloudPrivateEndpoint())) {
-                privateHostPort = 
splitHostAndPort(backend.getCloudPrivateEndpoint());
+            if (!Strings.isNullOrEmpty(backend.getPrivateEndpoint())) {
+                privateHostPort = 
splitHostAndPort(backend.getPrivateEndpoint());
             }
         } catch (AnalysisException e) {
             throw new LoadException(e.getMessage());
         }
 
+        String redirectPolicy = 
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
+        redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
+                ? Config.streamload_redirect_policy : redirectPolicy;
+
         String reqHostStr = req.getHeader(HttpHeaderNames.HOST.toString());
         reqHostStr = reqHostStr.replaceAll("\\s+", "");
         if (reqHostStr.isEmpty()) {
@@ -492,7 +512,21 @@ public class LoadAction extends RestBaseController {
             throw new LoadException("Invalid header host: " + reqHost);
         }
 
-        if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
+        // User specified redirect policy
+        if (redirectPolicy != null && 
(redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_DIRECT)
+                || 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE))) {
+            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
+        } else if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC)) {
+            if (publicHostPort != null) {
+                return new TNetworkAddress(publicHostPort.first, 
publicHostPort.second);
+            }
+            throw new LoadException("public endpoint is null, please check be 
public endpoint config");
+        } else if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PRIVATE)) {
+            if (privateHostPort != null) {
+                return new TNetworkAddress(privateHostPort.first, 
privateHostPort.second);
+            }
+            throw new LoadException("private endpoint is null, please check be 
private endpoint config");
+        } else if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
             // redirect with ip
             if (InetAddressValidator.getInstance().isValid(reqHost)) {
                 InetAddress addr;
@@ -525,10 +559,10 @@ public class LoadAction extends RestBaseController {
             }
         } else {
             if (InetAddressValidator.getInstance().isValid(reqHost)
-                    && publicHostPort != null && reqHost == 
publicHostPort.first) {
+                    && publicHostPort != null && 
reqHost.equalsIgnoreCase(publicHostPort.first)) {
                 return new TNetworkAddress(publicHostPort.first, 
publicHostPort.second);
             } else if (privateHostPort != null) {
-                return new TNetworkAddress(reqHost, privateHostPort.second);
+                return new TNetworkAddress(privateHostPort.first, 
privateHostPort.second);
             } else {
                 return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index 7d6a18829cf..17b7db694ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -69,9 +69,12 @@ public class Tag implements Writable {
     public static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
     public static final String CLOUD_CLUSTER_ID = "cloud_cluster_id";
     public static final String CLOUD_UNIQUE_ID = "cloud_unique_id";
+    public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
     public static final String CLOUD_CLUSTER_PUBLIC_ENDPOINT = 
"cloud_cluster_public_endpoint";
     public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = 
"cloud_cluster_private_endpoint";
-    public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
+
+    public static final String PUBLIC_ENDPOINT = "public_endpoint";
+    public static final String PRIVATE_ENDPOINT = "private_endpoint";
 
     public static final String COMPUTE_GROUP_NAME = "compute_group_name";
 
@@ -86,6 +89,7 @@ public class Tag implements Writable {
             VALUE_MIX, VALUE_DEFAULT_CLUSTER);
     private static final String TAG_TYPE_REGEX = "^[a-z][a-z0-9_]{0,32}$";
     private static final String TAG_VALUE_REGEX = 
"^[a-zA-Z][a-zA-Z0-9_]{0,32}$";
+    private static final String ENDPOINT_REGEX = "^[a-zA-Z0-9.-]+(:[0-9]+)?$";
 
 
     public static final Tag DEFAULT_BACKEND_TAG;
@@ -112,7 +116,13 @@ public class Tag implements Writable {
         if (!type.matches(TAG_TYPE_REGEX)) {
             throw new AnalysisException("Invalid tag type format: " + type);
         }
-        if (!value.matches(TAG_VALUE_REGEX)) {
+
+        // if type is an endpoint type, value must be a valid endpoint
+        if (type.equalsIgnoreCase(PUBLIC_ENDPOINT) || 
type.equalsIgnoreCase(PRIVATE_ENDPOINT)) {
+            if (!value.matches(ENDPOINT_REGEX)) {
+                throw new AnalysisException("Invalid " + type + " value 
format: " + value);
+            }
+        } else if (!value.matches(TAG_VALUE_REGEX)) {
             throw new AnalysisException("Invalid tag value format: " + value);
         }
         return new Tag(type, value);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 4ef7fa58f6c..f0a7990c9bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -222,12 +222,23 @@ public class Backend implements Writable {
         return tagMap.getOrDefault(Tag.CLOUD_UNIQUE_ID, "");
     }
 
-    public String getCloudPublicEndpoint() {
-        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, "");
+    // This modification changes
+    // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT to
+    // PUBLIC_ENDPOINT/PRIVATE_ENDPOINT, but backend information
+    // has been persisted in the edit log. For upgrade compatibility, the tag 
may
+    // not have public_endpoint/private_endpoint
+    // during initial upgrade, so we first try to get
+    // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT, and later 
it
+    // will be
+    // synchronized from meta service to the public_endpoint/private_endpoint 
tag.
+    // CLOUD_CLUSTER_PUBLIC_ENDPOINT/CLOUD_CLUSTER_PRIVATE_ENDPOINT can be
+    // removed in future versions.
+    public String getPublicEndpoint() {
+        return tagMap.getOrDefault(Tag.PUBLIC_ENDPOINT, 
tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, ""));
     }
 
-    public String getCloudPrivateEndpoint() {
-        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, "");
+    public String getPrivateEndpoint() {
+        return tagMap.getOrDefault(Tag.PRIVATE_ENDPOINT, 
tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, ""));
     }
 
     public long getId() {
@@ -1028,12 +1039,9 @@ public class Backend implements Writable {
         Map<String, String> displayTagMap = Maps.newHashMap();
         displayTagMap.putAll(tagMap);
 
-        if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
-            displayTagMap.put("public_endpoint", 
displayTagMap.remove("cloud_cluster_public_endpoint"));
-        }
-        if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
-            displayTagMap.put("private_endpoint", 
displayTagMap.remove("cloud_cluster_private_endpoint"));
-        }
+        // Migrate old cloud endpoint tags to new endpoint tags for backward 
compatibility
+        migrateEndpointTag(displayTagMap, "cloud_cluster_public_endpoint", 
"public_endpoint");
+        migrateEndpointTag(displayTagMap, "cloud_cluster_private_endpoint", 
"private_endpoint");
         if (displayTagMap.containsKey("cloud_cluster_status")) {
             displayTagMap.put("compute_group_status", 
displayTagMap.remove("cloud_cluster_status"));
         }
@@ -1087,4 +1095,21 @@ public class Backend implements Writable {
         return !Collections.disjoint(wgTagSet, beTagSet);
     }
 
+    /**
+     * Migrate endpoint tag from old name to new name for backward 
compatibility.
+     * If new tag already exists, just remove the old tag.
+     * If new tag doesn't exist, rename old tag to new tag.
+     */
+    private void migrateEndpointTag(Map<String, String> tagMap, String 
oldTagName, String newTagName) {
+        if (tagMap.containsKey(oldTagName)) {
+            if (tagMap.containsKey(newTagName)) {
+                // New tag exists, just remove the old one
+                tagMap.remove(oldTagName);
+            } else {
+                // New tag doesn't exist, migrate old tag to new tag
+                tagMap.put(newTagName, tagMap.remove(oldTagName));
+            }
+        }
+    }
+
 }
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0669064c52e..2700f097687 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -178,6 +178,8 @@ message NodeInfoPB {
     optional bool is_smooth_upgrade = 12;
     // fqdn
     optional string host = 13;
+    optional string public_endpoint = 14;
+    optional string private_endpoint = 15;
 }
 
 enum NodeStatusPB {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 3abbed2c399..952cba30a88 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -230,6 +230,10 @@ class Backend extends ServerNode {
         return path + '/conf/be.conf'
     }
 
+    String getHeartbeatPort() {
+        return heartbeatPort;
+    }
+
 }
 
 class MetaService extends ServerNode {
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy
new file mode 100644
index 00000000000..2675b3e5570
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_endpoint.groovy
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_stream_load_endpoint', 'docker') {
+    def getRedirectLocation = { feIp, fePort, redirectPolicy ->
+        def command = """ curl -v --max-redirs 0 --location-trusted -u root:  
+                -H redirect-policy:$redirectPolicy  
+                 -T 
${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv 
+                http://${feIp}:${fePort}/api/test/tbl/_stream_load """
+        log.info("redirect command: ${command}")
+
+        def code = -1
+        def location = ""
+        try {
+            def process = command.execute()
+            code = process.waitFor()
+            // Parse Location from stderr since curl -v outputs headers to 
stderr
+            def errorOutput = process.err.text
+            def locationLine = errorOutput.readLines().find { 
it.trim().startsWith('< Location: ') }
+            if (locationLine) {
+                location = locationLine.trim().substring('< Location: 
'.length())
+            }
+            log.info("curl output: ${process.text}")
+            log.info("curl error: ${errorOutput}")
+        } catch (Exception e) {
+            log.info("exception: ${e}".toString())
+        }
+        return location
+    }
+    // local mode
+    def options = new ClusterOptions()
+    options.feNum = 1
+    options.beNum = 3
+    options.cloudMode = false
+    options.beConfigs.add('enable_java_support=false')
+    options.feConfigs.add('enable_print_request_before_execution=true')
+    docker(options) {
+        // get fe and be info
+        def feIp = cluster.getMasterFe().getHttpAddress()[0]
+        def fePort = cluster.getMasterFe().getHttpAddress()[1]
+        
+        def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+        def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+        def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+        
+        def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+        def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+        def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+        
+        def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+        def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+        def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+        
+        log.info("Initial cluster setup - 3 BEs")
+        log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2: 
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+        sql """show backends"""
+        
+        // Drop BE1 and BE2
+        log.info("Dropping BE1 and BE2")
+        sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+        sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+        log.info("Backends after dropping BE1 and BE2: ${sql """show 
backends""" }")
+        
+        // Add BE1 back with custom endpoints
+        log.info("Adding BE1 back with custom public and private endpoints")
+        sql """ALTER SYSTEM ADD BACKEND '${be1Ip}:${be1HeartbeatPort}' 
properties('tag.public_endpoint' = '11.10.10.10:8010', 'tag.private_endpoint' = 
'10.10.10.9:8020')"""
+        
+        // Add BE2 back with different custom endpoints
+        log.info("Adding BE2 back with different custom endpoints")
+        sql """ALTER SYSTEM ADD BACKEND '${be2Ip}:${be2HeartbeatPort}' 
properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' = 
'11.20.20.19:8040')"""
+        
+        // Modify BE3's endpoints
+        log.info("Modifying BE3's endpoints")
+        sql """ALTER SYSTEM MODIFY BACKEND '${be3Ip}:${be3HeartbeatPort}' set 
('tag.public_endpoint' = '13.30.30.30:8050', 'tag.private_endpoint' = 
'12.30.30.29:8060', 'tag.location' = 'default')"""
+        
+        log.info("Final backends configuration: ${sql """show backends""" }")
+        
+        // Test redirect locations - should use one of the available BEs
+        def location = getRedirectLocation(feIp, fePort, "public")
+        log.info("public location: ${location}")
+        assertTrue(location.contains("11.10.10.10:8010") || 
location.contains("12.20.20.20:8030") || location.contains("13.30.30.30:8050"))
+
+        location = getRedirectLocation(feIp, fePort, "private")
+        log.info("private location: ${location}")
+        assertTrue(location.contains("10.10.10.9:8020") || 
location.contains("11.20.20.19:8040") || location.contains("12.30.30.29:8060"))
+
+        location = getRedirectLocation(feIp, fePort, "direct")
+        log.info("direct location: ${location}")
+        assertTrue(location.contains("${be1Ip}:${be1HttpPort}") || 
location.contains("${be2Ip}:${be2HttpPort}") || 
location.contains("${be3Ip}:${be3HttpPort}"))
+
+        location = getRedirectLocation(feIp, fePort, "")
+        log.info("default location: ${location}")
+        assertTrue(location.contains("10.10.10.9:8020") || 
location.contains("11.20.20.19:8040") || location.contains("12.30.30.29:8060"))
+
+        // Test specific BE endpoint modifications
+        log.info("Testing endpoint configuration for specific BEs")
+        
+        sql """ALTER SYSTEM MODIFY BACKEND 
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
 set ('tag.location' = 'default')"""
+        
+        location = getRedirectLocation(feIp, fePort, "")
+        log.info("final private location test: ${location}")
+        assertTrue(location.contains("${be1Ip}:${be1HttpPort}") || 
location.contains("${be2Ip}:${be2HttpPort}") || 
location.contains("${be3Ip}:${be3HttpPort}"))
+
+        sql """ALTER SYSTEM MODIFY BACKEND 
'${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}'
 set ('tag.location' = 'default',  'tag.private_endpoint' = 
'12.30.30.29:8060')"""
+        location = getRedirectLocation(feIp, fePort, "")
+        log.info("final private location test: ${location}")
+        assertTrue(location.contains("12.30.30.29:8060"))
+    }
+
+    // cloud mode
+    def options2 = new ClusterOptions()
+    options2.feNum = 1
+    options2.beNum = 3
+    options2.cloudMode = true
+    options2.beConfigs.add('enable_java_support=false')
+    options2.feConfigs.add('enable_print_request_before_execution=true')
+    docker(options2) {
+        // get fe and be info
+        def feIp = cluster.getMasterFe().getHttpAddress()[0]
+        def fePort = cluster.getMasterFe().getHttpAddress()[1]
+        
+        def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0]
+        def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1]
+        def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort()
+        
+        def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0]
+        def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1]
+        def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort()
+        
+        def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0]
+        def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1]
+        def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort()
+        
+        def msIp = cluster.getMetaservices().get(0).getHttpAddress()[0]
+        def msPort = cluster.getMetaservices().get(0).getHttpAddress()[1]
+        
+        log.info("Initial cluster setup - 3 BEs")
+        log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2: 
${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}")
+        sql """show backends"""
+        
+        // Drop BE1 and BE2
+        log.info("Dropping BE1 and BE2")
+        sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'"""
+        sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'"""
+        log.info("Backends after dropping BE1 and BE2: ${sql """show 
backends""" }")
+        
+        // Add BE1 back with custom endpoints
+        log.info("Adding BE1 back with custom public and private endpoints")
+        sql """ALTER SYSTEM ADD BACKEND '${be1Ip}:${be1HeartbeatPort}' 
properties('tag.public_endpoint' = '11.10.10.10:8010', 'tag.private_endpoint' = 
'10.10.10.9:8020',"tag.compute_group_name" = "default_compute_group")"""
+        
+        // Drop BE3
+        sql """ALTER SYSTEM DROPP BACKEND '${be3Ip}:${be3HeartbeatPort}'"""
+
+        // Add BE2 BE3 back with different custom endpoints
+        log.info("Adding BE2 BE3 back with different custom endpoints")
+        sql """ALTER SYSTEM ADD BACKEND 
'${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}' 
properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' = 
'11.20.20.19:8040', "tag.compute_group_name" = "default_compute_group")"""
+
+        sleep(30000) 
+        
+        log.info("Final backends configuration: ${sql """show backends""" }")
+        
+        // Test redirect locations - should use one of the available BEs
+        def location = getRedirectLocation(feIp, fePort, "public")
+        log.info("public location: ${location}")
+        assertTrue(location.contains("11.10.10.10:8010") || 
location.contains("12.20.20.20:8030"))
+
+        location = getRedirectLocation(feIp, fePort, "private")
+        log.info("private location: ${location}")
+        assertTrue(location.contains("10.10.10.9:8020") || 
location.contains("11.20.20.19:8040"))
+
+        location = getRedirectLocation(feIp, fePort, "direct")
+        log.info("direct location: ${location}")
+        assertTrue(location.contains("${be1Ip}:${be1HttpPort}") || 
location.contains("${be2Ip}:${be2HttpPort}") || 
location.contains("${be3Ip}:${be3HttpPort}"))
+
+        location = getRedirectLocation(feIp, fePort, "")
+        log.info("default location: ${location}")
+        assertTrue(location.contains("10.10.10.9:8020") || 
location.contains("11.20.20.19:8040"))
+        
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to