This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 02dff85679115b8da747b39e203858308870a249
Author: deardeng <565620...@qq.com>
AuthorDate: Fri Jul 19 11:29:58 2024 +0800

    [improvement](cloud) Accelerate cloud rebalance by batch editlog (#37787)
    
    1. use `JournalBatch` to batch editlogs
    2. same partition, different tablets use one editlog
    
    env:
    in docker cloud mode, 3fe 3be.
    3be expansion to 4be, trigger cloud rebalance
    table, 1860 partitions, 48 buckets, every rebalance loop min balance 12
    and close pre cache
    
    result:
    ```
    before improvement
    2024-07-16 16:51:01,371 INFO (cloud tablet rebalancer|77) 
[CloudTabletRebalancer.runAfterCatalogReady():228]
    finished to rebalancer. cost: 58471 ms
    
    
    after imprevement
    2024-07-16 17:10:20,699 INFO (cloud tablet rebalancer|77) 
[CloudTabletRebalancer.runAfterCatalogReady():235]
    finished to rebalancer. cost: 28687 ms
    ```
---
 .../analysis/ShowReplicaDistributionStmt.java      |   6 +-
 .../apache/doris/cloud/catalog/CloudReplica.java   |   6 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java | 158 +++++++++++++++++---
 .../cloud/datasource/CloudInternalCatalog.java     |   7 +-
 .../cloud/persist/UpdateCloudReplicaInfo.java      |  53 ++-----
 .../org/apache/doris/journal/JournalBatch.java     |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |  24 ++-
 .../org/apache/doris/regression/suite/Suite.groovy |  11 ++
 .../cloud_p0/multi_cluster/test_rebalance.groovy   | 165 +++++++++++++++++++++
 9 files changed, 372 insertions(+), 64 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
index 6d598be727a..58d2ac1052f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
@@ -82,6 +82,10 @@ public class ShowReplicaDistributionStmt extends ShowStmt {
 
     @Override
     public RedirectStatus getRedirectStatus() {
-        return RedirectStatus.FORWARD_NO_SYNC;
+        if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
+            return RedirectStatus.FORWARD_NO_SYNC;
+        } else {
+            return RedirectStatus.NO_FORWARD;
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index d2e173dfd53..be0c510559e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -25,6 +25,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Backend;
 
@@ -221,7 +222,10 @@ public class CloudReplica extends Replica {
                 return backendId;
             }
         }
-
+        if 
(DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
+            LOG.info("Debug Point enable 
CloudReplica.getBackendIdImpl.clusterToBackends");
+            return -1;
+        }
         return hashReplicaToBe(clusterId, false);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 30149de0d56..73ddbe4c455 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -45,6 +45,7 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
 import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
 
+import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -57,6 +58,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 public class CloudTabletRebalancer extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(CloudTabletRebalancer.class);
@@ -181,8 +183,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
         // 2 complete route info
         replicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
         completeRouteInfo();
-        for (UpdateCloudReplicaInfo info : replicaInfos) {
-            Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+        LOG.info("collect to editlog route {} infos", replicaInfos.size());
+        try {
+            
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos);
+        } catch (Exception e) {
+            LOG.warn("failed to update cloud replicas", e);
+            // edit log failed, try next time
+            return;
         }
 
         // 3 check whether the inflight preheating task has been completed
@@ -238,9 +245,20 @@ public class CloudTabletRebalancer extends MasterDaemon {
                      entry.getKey(), entry.getValue().size());
         }
 
+        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         // balance in partitions/index
         for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
-            balanceInPartition(entry.getValue(), entry.getKey());
+            balanceInPartition(entry.getValue(), entry.getKey(), infos);
+        }
+        long oldSize = infos.size();
+        infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+        LOG.info("collect to editlog partitions before size={} after size={} 
infos", oldSize, infos.size());
+        try {
+            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+        } catch (Exception e) {
+            LOG.warn("failed to update cloud replicas", e);
+            // edit log failed, try next time
+            return;
         }
 
         for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
@@ -263,9 +281,20 @@ public class CloudTabletRebalancer extends MasterDaemon {
                     entry.getKey(), entry.getValue().size());
         }
 
+        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         // balance in partitions/index
         for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
-            balanceInTable(entry.getValue(), entry.getKey());
+            balanceInTable(entry.getValue(), entry.getKey(), infos);
+        }
+        long oldSize = infos.size();
+        infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+        LOG.info("collect to editlog table before size={} after size={} 
infos", oldSize, infos.size());
+        try {
+            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+        } catch (Exception e) {
+            LOG.warn("failed to update cloud replicas", e);
+            // edit log failed, try next time
+            return;
         }
 
         for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
@@ -288,8 +317,19 @@ public class CloudTabletRebalancer extends MasterDaemon {
                     entry.getKey(), entry.getValue().size());
         }
 
+        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
-            balanceImpl(entry.getValue(), entry.getKey(), 
futureBeToTabletsGlobal, BalanceType.GLOBAL);
+            balanceImpl(entry.getValue(), entry.getKey(), 
futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
+        }
+        long oldSize = infos.size();
+        infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+        LOG.info("collect to editlog global before size={} after size={} 
infos", oldSize, infos.size());
+        try {
+            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+        } catch (Exception e) {
+            LOG.warn("failed to update cloud replicas", e);
+            // edit log failed, try next time
+            return;
         }
 
         for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
@@ -310,6 +350,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
         }
 
+        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
             LOG.info("before pre cache check dest be {} inflight task num {}", 
entry.getKey(), entry.getValue().size());
             Backend destBackend = 
cloudSystemInfoService.getBackend(entry.getKey());
@@ -335,11 +376,21 @@ public class CloudTabletRebalancer extends MasterDaemon {
                     if (!result.getValue()) {
                         LOG.info("{} pre cache timeout, forced to change the 
mapping", result.getKey());
                     }
-                    updateClusterToBeMap(task.pickedTablet, task.destBe, 
task.clusterId);
+                    updateClusterToBeMap(task.pickedTablet, task.destBe, 
task.clusterId, infos);
                     tabletToInfightTask.remove(result.getKey());
                 }
             }
         }
+        long oldSize = infos.size();
+        infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+        LOG.info("collect to editlog warmup before size={} after size={} 
infos", oldSize, infos.size());
+        try {
+            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+        } catch (Exception e) {
+            LOG.warn("failed to update cloud replicas", e);
+            // edit log failed, try next time
+            return;
+        }
 
         // recalculate inflight beToTablets, just for print the log
         beToTabletIds = new HashMap<Long, List<Long>>();
@@ -550,22 +601,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
     }
 
-    public void balanceInPartition(List<Long> bes, String clusterId) {
+    public void balanceInPartition(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all partition
         for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>> 
partitionEntry : futurePartitionToTablets.entrySet()) {
             Map<Long, Map<Long, List<Tablet>>> indexToTablets = 
partitionEntry.getValue();
             // balance all index of a partition
             for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : 
indexToTablets.entrySet()) {
                 // balance a index
-                balanceImpl(bes, clusterId, entry.getValue(), 
BalanceType.PARTITION);
+                balanceImpl(bes, clusterId, entry.getValue(), 
BalanceType.PARTITION, infos);
             }
         }
     }
 
-    public void balanceInTable(List<Long> bes, String clusterId) {
+    public void balanceInTable(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all tables
         for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : 
futureBeToTabletsInTable.entrySet()) {
-            balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE);
+            balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, 
infos);
         }
     }
 
@@ -641,7 +692,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         partToTablets);
     }
 
-    private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String 
clusterId) {
+    private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String 
clusterId,
+                                      List<UpdateCloudReplicaInfo> infos) {
         CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
         cloudReplica.updateClusterToBe(clusterId, destBe);
         Database db = 
Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
@@ -663,7 +715,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             UpdateCloudReplicaInfo info = new 
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
                     cloudReplica.getTableId(), cloudReplica.getPartitionId(), 
cloudReplica.getIndexId(),
                     pickedTablet.getId(), cloudReplica.getId(), clusterId, 
destBe);
-            Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+            infos.add(info);
         } finally {
             table.readUnlock();
         }
@@ -765,7 +817,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private void balanceImpl(List<Long> bes, String clusterId, Map<Long, 
List<Tablet>> beToTablets,
-            BalanceType balanceType) {
+            BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
         if (bes == null || bes.isEmpty() || beToTablets == null || 
beToTablets.isEmpty()) {
             return;
         }
@@ -852,7 +904,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         beToTabletsInTable, partitionToTablets);
                 updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
                         futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
-                updateClusterToBeMap(pickedTablet, destBe, clusterId);
+                updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
             }
         }
     }
@@ -869,17 +921,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
         List<Tablet> tablets = new ArrayList<>();
         if (!beToTabletsGlobal.containsKey(srcBe)) {
             LOG.info("smooth upgrade srcBe={} does not have any tablets, set 
inactive", srcBe);
-            // TODO(merge-cloud): wait add cloud upgrade mgr
-            // 
Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
+            ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
             return;
         }
         tablets = beToTabletsGlobal.get(srcBe);
         if (tablets.isEmpty()) {
             LOG.info("smooth upgrade srcBe={} does not have any tablets, set 
inactive", srcBe);
-            // TODO(merge-cloud): wait add cloud upgrade mgr
-            // 
Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
+            ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
             return;
         }
+        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
         for (Tablet tablet : tablets) {
             // get replica
             CloudReplica cloudReplica = (CloudReplica) 
tablet.getReplicas().get(0);
@@ -915,11 +966,21 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 UpdateCloudReplicaInfo info = new 
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
                         cloudReplica.getTableId(), 
cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
                         tablet.getId(), cloudReplica.getId(), clusterId, 
dstBe);
-                Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+                infos.add(info);
             } finally {
                 table.readUnlock();
             }
         }
+        long oldSize = infos.size();
+        infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+        LOG.info("collect to editlog migrate before size={} after size={} 
infos", oldSize, infos.size());
+        try {
+            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+        } catch (Exception e) {
+            LOG.warn("update cloud replicas failed", e);
+            // edit log failed, try next time
+            throw new RuntimeException(e);
+        }
 
         try {
             ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
@@ -928,5 +989,64 @@ public class CloudTabletRebalancer extends MasterDaemon {
             throw new RuntimeException(e);
         }
     }
+
+    private List<UpdateCloudReplicaInfo> 
batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
+        long start = System.currentTimeMillis();
+        List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
+        // clusterId, infos
+        Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = 
infos.stream()
+                
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
+        for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : 
clusterIdToInfos.entrySet()) {
+            // same cluster
+            String clusterId = entry.getKey();
+            List<UpdateCloudReplicaInfo> infoList = entry.getValue();
+            Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos = 
infoList.stream()
+                    .collect(Collectors.groupingBy(
+                            info -> info.getDbId()
+                            + info.getTableId() + info.getPartitionId() + 
info.getIndexId()));
+            sameLocationInfos.forEach((location, locationInfos) -> {
+                UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
+                long dbId = -1;
+                long tableId = -1;
+                long partitionId = -1;
+                long indexId = -1;
+                for (UpdateCloudReplicaInfo info : locationInfos) {
+                    
Preconditions.checkState(clusterId.equals(info.getClusterId()),
+                            "impossible, cluster id not eq outer=" + clusterId 
+ ", inner=" + info.getClusterId());
+
+                    dbId = info.getDbId();
+                    tableId = info.getTableId();
+                    partitionId = info.getPartitionId();
+                    indexId = info.getIndexId();
+
+                    StringBuilder sb = new StringBuilder("impossible, some 
locations do not match location");
+                    sb.append(", location=").append(location).append(", 
dbId=").append(dbId)
+                        .append(", tableId=").append(tableId).append(", 
partitionId=").append(partitionId)
+                        .append(", indexId=").append(indexId);
+                    Preconditions.checkState(location == dbId + tableId + 
partitionId + indexId, sb.toString());
+
+                    long tabletId = info.getTabletId();
+                    long replicaId = info.getReplicaId();
+                    long beId = info.getBeId();
+                    newInfo.getTabletIds().add(tabletId);
+                    newInfo.getReplicaIds().add(replicaId);
+                    newInfo.getBeIds().add(beId);
+                }
+                newInfo.setDbId(dbId);
+                newInfo.setTableId(tableId);
+                newInfo.setPartitionId(partitionId);
+                newInfo.setIndexId(indexId);
+                newInfo.setClusterId(clusterId);
+                // APPR: in unprotectUpdateCloudReplica, use batch must set 
tabletId = -1
+                newInfo.setTabletId(-1);
+                rets.add(newInfo);
+            });
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur 
size {} cost {} ms",
+                    infos.size(), rets.size(), System.currentTimeMillis() - 
start);
+        }
+        return rets;
+    }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 721ec7899f4..20b1e17f529 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -939,7 +939,12 @@ public class CloudInternalCatalog extends InternalCatalog {
                 List<Long> tabletIds = info.getTabletIds();
                 for (int i = 0; i < tabletIds.size(); ++i) {
                     Tablet tablet = 
materializedIndex.getTablet(tabletIds.get(i));
-                    Replica replica = tablet.getReplicas().get(0);
+                    Replica replica;
+                    if (info.getReplicaIds().isEmpty()) {
+                        replica = tablet.getReplicas().get(0);
+                    } else {
+                        replica = 
tablet.getReplicaById(info.getReplicaIds().get(i));
+                    }
                     Preconditions.checkNotNull(replica, info);
 
                     String clusterId = info.getClusterId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
index 1ff2912a397..c5d6fc0cd64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
@@ -17,11 +17,14 @@
 
 package org.apache.doris.cloud.persist;
 
+
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -29,6 +32,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+@Getter
+@Setter
 public class UpdateCloudReplicaInfo implements Writable {
     @SerializedName(value = "dbId")
     private long dbId;
@@ -54,6 +59,9 @@ public class UpdateCloudReplicaInfo implements Writable {
     @SerializedName(value = "beIds")
     private List<Long> beIds = new ArrayList<Long>();
 
+    @SerializedName(value = "rids")
+    private List<Long> replicaIds = new ArrayList<>();
+
     public UpdateCloudReplicaInfo() {
     }
 
@@ -97,46 +105,6 @@ public class UpdateCloudReplicaInfo implements Writable {
         return GsonUtils.GSON.fromJson(json, UpdateCloudReplicaInfo.class);
     }
 
-    public long getDbId() {
-        return dbId;
-    }
-
-    public long getTableId() {
-        return tableId;
-    }
-
-    public long getPartitionId() {
-        return partitionId;
-    }
-
-    public long getIndexId() {
-        return indexId;
-    }
-
-    public long getTabletId() {
-        return tabletId;
-    }
-
-    public long getReplicaId() {
-        return replicaId;
-    }
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public long getBeId() {
-        return beId;
-    }
-
-    public List<Long> getBeIds() {
-        return beIds;
-    }
-
-    public List<Long> getTabletIds() {
-        return tabletIds;
-    }
-
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("database id: ").append(dbId);
@@ -159,6 +127,11 @@ public class UpdateCloudReplicaInfo implements Writable {
                 for (long id : tabletIds) {
                     sb.append(" ").append(id);
                 }
+
+                sb.append(" replica id list: ");
+                for (long id : replicaIds) {
+                    sb.append(" ").append(id);
+                }
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
index 12d62b68717..e56bf34dfe5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -21,6 +21,8 @@ import org.apache.doris.common.io.DataOutputBuffer;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.OperationType;
 
+import lombok.Getter;
+
 import java.io.IOException;
 import java.util.ArrayList;
 
@@ -29,6 +31,9 @@ public class JournalBatch {
 
     private ArrayList<Entity> entities;
 
+    @Getter
+    private long size = 0;
+
     public JournalBatch() {
         entities = new ArrayList<>();
     }
@@ -56,6 +61,7 @@ public class JournalBatch {
 
         DataOutputBuffer buffer = new 
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
         entity.write(buffer);
+        size += buffer.size();
 
         entities.add(new Entity(op, buffer));
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index eb26bbc04f0..c3a326f6d48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -59,6 +59,7 @@ import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.insertoverwrite.InsertOverwriteLog;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.BDBJEJournal;
@@ -1264,6 +1265,21 @@ public class EditLog {
         journal.rollJournal();
     }
 
+    private synchronized <T extends Writable> void logEdit(short op, List<T> 
entries) throws IOException {
+        JournalBatch batch = new JournalBatch(35);
+        for (T entry : entries) {
+            // the number of batch entities to less than 32 and the batch data 
size to less than 640KB
+            if (batch.getJournalEntities().size() >= 32 || batch.getSize() >= 
640 * 1024) {
+                journal.write(batch);
+                batch = new JournalBatch(35);
+            }
+            batch.addJournal(op, entry);
+        }
+        if (!batch.getJournalEntities().isEmpty()) {
+            journal.write(batch);
+        }
+    }
+
     /**
      * Write an operation to the edit log. Do not sync to persistent store yet.
      */
@@ -1576,8 +1592,12 @@ public class EditLog {
         logEdit(OperationType.OP_EXPORT_CREATE, job);
     }
 
-    public void logUpdateCloudReplica(UpdateCloudReplicaInfo info) {
-        logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, info);
+    public void logUpdateCloudReplicas(List<UpdateCloudReplicaInfo> infos) 
throws IOException {
+        long start = System.currentTimeMillis();
+        logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, infos);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("log update {} cloud replicas. cost: {} ms", 
infos.size(), (System.currentTimeMillis() - start));
+        }
     }
 
     public void logExportUpdateState(long jobId, ExportJobState newState) {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 8397a638c36..a964996c236 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -17,6 +17,8 @@
 
 package org.apache.doris.regression.suite
 
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
 import groovy.json.JsonOutput
 import com.google.common.collect.Maps
 import com.google.common.util.concurrent.Futures
@@ -260,6 +262,15 @@ class Suite implements GroovyInterceptable {
         return context.connect(user, password, url, actionSupplier)
     }
 
+    public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, 
Closure actionSupplier) {
+        def connInfo = context.threadLocalConn.get()
+        Awaitility.await().atMost(atMostSeconds, 
SECONDS).pollInterval(intervalSecond, SECONDS).until(
+            {
+                connect(connInfo.username, connInfo.password, 
connInfo.conn.getMetaData().getURL(), actionSupplier)
+            }
+        )
+    }
+
     public void docker(ClusterOptions options = new ClusterOptions(), Closure 
actionSupplier) throws Exception {
         if (context.config.excludeDockerTest) {
             return
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
new file mode 100644
index 00000000000..e1735a4acd4
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite('test_rebalance_in_cloud', 'multi_cluster') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'enable_cloud_warm_up_for_rebalance=false',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'cloud_balance_tablet_percent_per_run=0.5',
+        'cloud_pre_heating_time_limit_sec=1',
+        'sys_log_verbose_modules=org',
+    ]
+    options.setFeNum(3)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.connectToFollower = true
+    options.enableDebugPoints()
+
+    docker(options) {
+        sql """
+            CREATE TABLE table100 (
+            class INT,
+            id INT,
+            score INT SUM
+            )
+            AGGREGATE KEY(class, id)
+            DISTRIBUTED BY HASH(class) BUCKETS 48
+        """
+
+        sql """
+            CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT 
NULL, k3 int sum NOT NULL )
+            AGGREGATE KEY(k1, k2)
+            PARTITION BY RANGE(k1) (
+            PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+            PARTITION p1993 VALUES [("19930101"), ("19940101")),
+            PARTITION p1994 VALUES [("19940101"), ("19950101")),
+            PARTITION p1995 VALUES [("19950101"), ("19960101")),
+            PARTITION p1996 VALUES [("19960101"), ("19970101")),
+            PARTITION p1997 VALUES [("19970101"), ("19980101")),
+            PARTITION p1998 VALUES [("19980101"), ("19990101")))
+            DISTRIBUTED BY HASH(k1) BUCKETS 3
+        """
+        
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends");
+        sql """set global forward_to_master=false"""
+        
+        // add a be
+        cluster.addBackend(1, null)
+        
+        dockerAwaitUntil(30) {
+            def bes = sql """show backends"""
+            log.info("bes: {}", bes)
+            bes.size() == 2
+        }
+
+        dockerAwaitUntil(5) {
+            def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
+            log.info("replica distribution table100: {}", ret)
+            ret.size() == 2
+        }
+
+        def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION 
FROM table100; """
+        assertEquals(2, result.size())
+        int replicaNum = 0
+
+        for (def row : result) {
+            log.info("replica distribution: ${row} ".toString())
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            if (replicaNum == 0) {
+                // due to debug point, observer not hash replica
+            } else {
+                assertTrue(replicaNum <= 25 && replicaNum >= 23)
+            }
+        }
+
+        dockerAwaitUntil(5) {
+            def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 
PARTITION(p1992)"""
+            log.info("replica distribution table_p2: {}", ret)
+            ret.size() == 2
+        }
+
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1992) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1993) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1994) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1995) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1996) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+
+        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1997) """
+        assertEquals(2, result.size())
+        for (def row : result) {
+            replicaNum = Integer.valueOf((String) row.ReplicaNum)
+            log.info("replica distribution: ${row} ".toString())
+            if (replicaNum != 0) {
+                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to