This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 24145a1c5af [fix](merge cloud) Fix cloud be set be tag map (#32864)
24145a1c5af is described below

commit 24145a1c5af59615a0005f2b00e3262b03e9ab60
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Wed Mar 27 14:54:16 2024 +0800

    [fix](merge cloud) Fix cloud be set be tag map (#32864)
---
 .../doris/cloud/catalog/CloudClusterChecker.java   | 35 +++++++++++-----------
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |  6 ++--
 .../apache/doris/cloud/catalog/CloudReplica.java   |  2 +-
 .../apache/doris/cloud/load/CloudLoadManager.java  |  5 ++--
 .../doris/cloud/system/CloudSystemInfoService.java | 19 ++++++------
 .../java/org/apache/doris/qe/StmtExecutor.java     |  3 +-
 .../regression/action/StreamLoadAction.groovy      | 31 +++++++++++++++++--
 7 files changed, 65 insertions(+), 36 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index cc4a3c09d9c..801f3166861 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -94,19 +94,13 @@ public class CloudClusterChecker extends MasterDaemon {
                     LOG.debug("begin to add clusterId: {}", addId);
                 }
                 // Attach tag to BEs
-                Map<String, String> newTagMap = 
Tag.DEFAULT_BACKEND_TAG.toMap();
                 String clusterName = 
remoteClusterIdToPB.get(addId).getClusterName();
                 String clusterId = 
remoteClusterIdToPB.get(addId).getClusterId();
                 String publicEndpoint = 
remoteClusterIdToPB.get(addId).getPublicEndpoint();
                 String privateEndpoint = 
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
-                newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
-                newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
-                newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
publicEndpoint);
-                newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
privateEndpoint);
                 // For old versions that do no have status field set
                 ClusterStatus clusterStatus = 
remoteClusterIdToPB.get(addId).hasClusterStatus()
                         ? remoteClusterIdToPB.get(addId).getClusterStatus() : 
ClusterStatus.NORMAL;
-                newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, 
String.valueOf(clusterStatus));
                 MetricRepo.registerCloudMetrics(clusterId, clusterName);
                 //toAdd.forEach(i -> i.setTagMap(newTagMap));
                 List<Backend> toAdd = new ArrayList<>();
@@ -117,6 +111,12 @@ public class CloudClusterChecker extends MasterDaemon {
                         continue;
                     }
                     Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
addr, node.getHeartbeatPort());
+                    Map<String, String> newTagMap = 
Tag.DEFAULT_BACKEND_TAG.toMap();
+                    newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, 
String.valueOf(clusterStatus));
+                    newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+                    newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+                    newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
publicEndpoint);
+                    newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
privateEndpoint);
                     newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
                     b.setTagMap(newTagMap);
                     toAdd.add(b);
@@ -250,13 +250,6 @@ public class CloudClusterChecker extends MasterDaemon {
 
             updateStatus(currentBes, expectedBes);
 
-            // Attach tag to BEs
-            Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
-            newTagMap.put(Tag.CLOUD_CLUSTER_NAME, 
remoteClusterIdToPB.get(cid).getClusterName());
-            newTagMap.put(Tag.CLOUD_CLUSTER_ID, 
remoteClusterIdToPB.get(cid).getClusterId());
-            newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPublicEndpoint());
-            newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
-
             diffNodes(toAdd, toDel, () -> {
                 Map<String, Backend> currentMap = new HashMap<>();
                 for (Backend be : currentBes) {
@@ -280,6 +273,14 @@ public class CloudClusterChecker extends MasterDaemon {
                     if (node.hasIsSmoothUpgrade()) {
                         b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
                     }
+
+                    // Attach tag to BEs
+                    Map<String, String> newTagMap = 
Tag.DEFAULT_BACKEND_TAG.toMap();
+                    newTagMap.put(Tag.CLOUD_CLUSTER_NAME, 
remoteClusterIdToPB.get(cid).getClusterName());
+                    newTagMap.put(Tag.CLOUD_CLUSTER_ID, 
remoteClusterIdToPB.get(cid).getClusterId());
+                    newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPublicEndpoint());
+                    newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
+                            remoteClusterIdToPB.get(cid).getPrivateEndpoint());
                     newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
                     b.setTagMap(newTagMap);
                     nodeMap.put(endpoint, b);
@@ -350,8 +351,8 @@ public class CloudClusterChecker extends MasterDaemon {
     }
 
     private void getCloudObserverFes() {
-        Cloud.GetClusterResponse response = CloudSystemInfoService
-                .getCloudCluster(Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
+        Cloud.GetClusterResponse response = 
cloudSystemInfoService.getCloudCluster(
+                Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
         if (!response.hasStatus() || !response.getStatus().hasCode()
                 || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) 
{
             LOG.warn("failed to get cloud cluster due to incomplete response, "
@@ -416,7 +417,7 @@ public class CloudClusterChecker extends MasterDaemon {
             return;
         }
         try {
-            CloudSystemInfoService.updateFrontends(toAdd, toDel);
+            cloudSystemInfoService.updateFrontends(toAdd, toDel);
         } catch (DdlException e) {
             LOG.warn("update cloud frontends exception e: {}, msg: {}", e, 
e.getMessage());
         }
@@ -426,7 +427,7 @@ public class CloudClusterChecker extends MasterDaemon {
         Map<String, List<Backend>> clusterIdToBackend = 
cloudSystemInfoService.getCloudClusterIdToBackend();
         //rpc to ms, to get mysql user can use cluster_id
         // NOTE: rpc args all empty, use cluster_unique_id to get a instance's 
all cluster info.
-        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster("", "", "");
+        Cloud.GetClusterResponse response = 
cloudSystemInfoService.getCloudCluster("", "", "");
         if (!response.hasStatus() || !response.getStatus().hasCode()
                 || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
                 && response.getStatus().getCode() != 
MetaServiceCode.CLUSTER_NOT_FOUND)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 613fef3be68..7c37f1dbcff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -83,8 +83,8 @@ public class CloudEnv extends Env {
 
     private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
         // get helperNodes from ms
-        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster(
-                Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
+        Cloud.GetClusterResponse response = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                .getCloudCluster(Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
         if (!response.hasStatus() || !response.getStatus().hasCode()
                 || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) 
{
             LOG.warn("failed to get cloud cluster due to incomplete response, "
@@ -392,7 +392,7 @@ public class CloudEnv extends Env {
 
     public void changeCloudCluster(String clusterName, ConnectContext ctx) 
throws DdlException {
         checkCloudClusterPriv(clusterName);
-        CloudSystemInfoService.waitForAutoStart(clusterName);
+        ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStart(clusterName);
         try {
             ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
         } catch (UserException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index a51e332a784..aebc66128ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -163,7 +163,7 @@ public class CloudReplica extends Replica {
 
         // if cluster is SUSPENDED, wait
         try {
-            CloudSystemInfoService.waitForAutoStart(cluster);
+            ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
         } catch (DdlException e) {
             // this function cant throw exception. so just log it
             LOG.warn("cant resume cluster {}", cluster);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index 1d0bfc23f6c..5caa2108c59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.load;
 
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
@@ -33,14 +34,14 @@ public class CloudLoadManager extends LoadManager {
 
     @Override
     public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, 
UserException {
-        CloudSystemInfoService.waitForAutoStartCurrentCluster();
+        ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
 
         return super.createLoadJobFromStmt(stmt);
     }
 
     @Override
     public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException {
-        CloudSystemInfoService.waitForAutoStartCurrentCluster();
+        ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
 
         return super.createLoadJobFromStmt(stmt);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 7171fbafd16..59ba7340dfc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -89,7 +89,7 @@ public class CloudSystemInfoService extends SystemInfoService 
{
      * @param clusterId   cluster id
      * @return
      */
-    public static Cloud.GetClusterResponse getCloudCluster(String clusterName, 
String clusterId, String userName) {
+    public Cloud.GetClusterResponse getCloudCluster(String clusterName, String 
clusterId, String userName) {
         Cloud.GetClusterRequest.Builder builder = 
Cloud.GetClusterRequest.newBuilder();
         builder.setCloudUniqueId(Config.cloud_unique_id)
             
.setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName);
@@ -261,8 +261,8 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
 
-    public static synchronized void updateFrontends(List<Frontend> toAdd,
-                                                    List<Frontend> toDel) 
throws DdlException {
+    public synchronized void updateFrontends(List<Frontend> toAdd, 
List<Frontend> toDel)
+            throws DdlException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
         }
@@ -570,7 +570,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         this.instanceStatus = instanceStatus;
     }
 
-    public static void waitForAutoStartCurrentCluster() throws DdlException {
+    public void waitForAutoStartCurrentCluster() throws DdlException {
         ConnectContext context = ConnectContext.get();
         if (context != null) {
             String cloudCluster = context.getCloudCluster();
@@ -580,7 +580,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         }
     }
 
-    public static String getClusterNameAutoStart(final String clusterName) {
+    public String getClusterNameAutoStart(final String clusterName) {
         if (!Strings.isNullOrEmpty(clusterName)) {
             return clusterName;
         }
@@ -607,7 +607,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         return cloudClusterTypeAndName.clusterName;
     }
 
-    public static void waitForAutoStart(String clusterName) throws 
DdlException {
+    public void waitForAutoStart(String clusterName) throws DdlException {
         if (Config.isNotCloudMode()) {
             return;
         }
@@ -616,7 +616,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             LOG.warn("auto start in cloud mode, but clusterName empty {}", 
clusterName);
             return;
         }
-        String clusterStatus = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+        String clusterStatus = getCloudStatusByName(clusterName);
         if (Strings.isNullOrEmpty(clusterStatus)) {
             // for cluster rename or cluster dropped
             LOG.warn("cant find clusterStatus in fe, clusterName {}", 
clusterName);
@@ -631,8 +631,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
 
             Cloud.ClusterPB.Builder clusterBuilder = 
Cloud.ClusterPB.newBuilder();
-            clusterBuilder.setClusterId(((CloudSystemInfoService)
-                    
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName));
+            clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName));
             clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
             builder.setCluster(clusterBuilder);
 
@@ -671,7 +670,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             } catch (InterruptedException e) {
                 LOG.info("change cluster sleep wait InterruptedException: ", 
e);
             }
-            clusterStatus = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+            clusterStatus = getCloudStatusByName(clusterName);
         }
         if (retryTime >= retryTimes) {
             // auto start timeout
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 40af03d9237..783323b03fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -820,7 +820,8 @@ public class StmtExecutor {
                                         deadCloudClusterStatus);
                                 if 
(Strings.isNullOrEmpty(deadCloudClusterStatus)
                                         || 
ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) {
-                                    
CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName);
+                                    ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                                            
.waitForAutoStart(deadCloudClusterClusterName);
                                 }
                             }
                         }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 56c80e88a40..606b9bc4ac8 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -54,6 +54,7 @@ class StreamLoadAction implements SuiteAction {
     Map<String, String> headers
     SuiteContext context
     boolean directToBe = false
+    boolean twoPhaseCommit = false
 
     StreamLoadAction(SuiteContext context) {
         this.address = context.getFeHttpAddress()
@@ -137,6 +138,22 @@ class StreamLoadAction implements SuiteAction {
         this.time = time.call()
     }
 
+    void twoPhaseCommit(boolean twoPhaseCommit) {
+        this.twoPhaseCommit = twoPhaseCommit;
+    }
+
+    void twoPhaseCommit(Closure<Boolean> twoPhaseCommit) {
+        this.twoPhaseCommit = twoPhaseCommit.call();
+    }
+
+    // compatible with selectdb case
+    void isCloud(boolean isCloud) {
+    }
+
+    // compatible with selectdb case
+    void isCloud(Closure<Boolean> isCloud) {
+    }
+
     void check(@ClosureParams(value = FromString, options = 
["String,Throwable,Long,Long"]) Closure check) {
         this.check = check
     }
@@ -156,8 +173,14 @@ class StreamLoadAction implements SuiteAction {
         long startTime = System.currentTimeMillis()
         def isHttpStream = headers.containsKey("version")
         try {
-            def uri = isHttpStream ? 
"http://${address.hostString}:${address.port}/api/_http_stream";
-                    : 
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load";
+            def uri = ""
+            if (isHttpStream) {
+                uri = 
"http://${address.hostString}:${address.port}/api/_http_stream";
+            } else if (twoPhaseCommit) {
+                uri = 
"http://${address.hostString}:${address.port}/api/${db}/_stream_load_2pc";
+            } else {
+                uri = 
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load";
+            }
             HttpClients.createDefault().withCloseable { client ->
                 RequestBuilder requestBuilder = 
prepareRequestHeader(RequestBuilder.put(uri))
                 HttpEntity httpEntity = prepareHttpEntity(client)
@@ -362,6 +385,10 @@ class StreamLoadAction implements SuiteAction {
             def jsonSlurper = new JsonSlurper()
             def parsed = jsonSlurper.parseText(responseText)
             String status = parsed.Status
+            if (twoPhaseCommit) {
+                status = parsed.status
+                return status;
+            }
             long txnId = parsed.TxnId
             if (!status.equalsIgnoreCase("Publish Timeout")) {
                 return status;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to