This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new abdeaec91dd [feature](alter colocate group) Support alter colocate 
group replica allocation #23320 (#25387)
abdeaec91dd is described below

commit abdeaec91ddc1250014198b7691a64bff4965ecb
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Fri Oct 13 16:22:43 2023 +0800

    [feature](alter colocate group) Support alter colocate group replica 
allocation #23320 (#25387)
---
 .../Alter/ALTER-COLOCATE-GROUP.md                  |  84 +++++++++
 docs/sidebars.json                                 |   1 +
 .../Alter/ALTER-COLOCATE-GROUP.md                  |  88 ++++++++++
 .../Alter/ALTER-WORKLOAD-GROUP.md                  |   4 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |  17 ++
 .../doris/analysis/AlterColocateGroupStmt.java     |  81 +++++++++
 .../apache/doris/analysis/ColocateGroupName.java   |  70 ++++++++
 .../apache/doris/catalog/ColocateGroupSchema.java  |   4 +
 .../apache/doris/catalog/ColocateTableIndex.java   | 191 ++++++++++++++++++++-
 .../org/apache/doris/catalog/PartitionInfo.java    |   4 +
 .../clone/ColocateTableCheckerAndBalancer.java     | 133 +++++++++++++-
 .../org/apache/doris/clone/TabletScheduler.java    |  18 +-
 .../doris/common/proc/TabletHealthProcDir.java     |   5 +
 .../doris/httpv2/meta/ColocateMetaService.java     |   2 +-
 .../org/apache/doris/journal/JournalEntity.java    |   1 +
 .../org/apache/doris/master/ReportHandler.java     |   5 +
 .../apache/doris/persist/ColocatePersistInfo.java  |  27 ++-
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   4 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 .../java/org/apache/doris/alter/AlterTest.java     | 181 +++++++++++++++++--
 .../org/apache/doris/utframe/UtFrameUtils.java     |   7 +-
 .../alter_p2/test_alter_colocate_group.groovy      | 170 ++++++++++++++++++
 24 files changed, 1076 insertions(+), 34 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000..54c87c05e67
--- /dev/null
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,84 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP
+
+<version since="dev"></version>
+
+### Description
+
+This statement is used to modify the colocation group.
+
+Syntax:
+
+```sql
+ALTER COLOCATE GROUP [database.]group
+SET (
+    property_list
+);
+```
+
+NOTE:
+
+1. If the colocate group is global, that is, its name starts with 
`__global__`, then it does not belong to any database;       
+
+2. property_list is a colocation group attribute, currently only supports 
modifying `replication_num` and `replication_allocation`. After modifying these 
two attributes of the colocation group, at the same time, change the attribute 
`default.replication_allocation`, the attribute 
`dynamic.replication_allocation` of the table of the group, and the 
`replication_allocation` of the existing partition to be the same as it.
+
+### Example
+
+1. Modify the number of copies of a global group
+
+     ```sql
+     # Set "colocate_with" = "__global__foo" when creating the table
+     
+     ALTER COLOCATE GROUP __global__foo
+     SET (
+         "replication_num"="1"
+     );   
+     ```
+
+2. Modify the number of copies of a non-global group
+
+  ```sql
+     # Set "colocate_with" = "bar" when creating the table, and the Database 
is "example_db"
+     
+     ALTER COLOCATE GROUP example_db.bar
+     SET (
+         "replication_num"="1"
+     );
+     ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE, GROUP
+```
+
+### Best Practice
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 744260a9a31..3b58f867dc7 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -795,6 +795,7 @@
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION",
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN",
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-RESOURCE",
+                                        
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP",
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP",
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/CANCEL-ALTER-TABLE",
                                         
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COMMENT",
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000..2b5ca2cc727
--- /dev/null
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,88 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP 
+
+<version since="dev"></version>
+
+### Description
+
+该语句用于修改 Colocation Group 的属性。
+
+语法:
+
+```sql
+ALTER COLOCATE GROUP  [database.]group
+SET (
+    property_list
+);
+```
+
+注意:
+
+1. 如果colocate group是全局的,即它的名称是以 `__global__` 开头的,那它不属于任何一个Database;
+
+2. property_list 是colocation group属性,目前只支持修改`replication_num` 和 
`replication_allocation`。
+    修改colocation 
group的这两个属性修改之后,同时把该group的表的属性`default.replication_allocation` 、
+    属性`dynamic.replication_allocation `、以及已有分区的`replication_allocation`改成跟它一样。
+
+
+
+### Example
+
+1. 修改一个全局group的副本数
+
+    ```sql
+    # 建表时设置 "colocate_with" = "__global__foo"
+    
+    ALTER COLOCATE GROUP __global__foo
+    SET (
+        "replication_num"="1"
+    );
+    ```
+
+2. 修改一个非全局group的副本数
+
+ ```sql
+    # 建表时设置 "colocate_with" = "bar",且表属于Database example_db
+    
+    ALTER COLOCATE GROUP example_db.bar
+    SET (
+        "replication_num"="1"
+    );
+    ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE , GROUP
+```
+
+### Best Practice
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
index 1bc19780f6c..e3c7c17b660 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
@@ -1,6 +1,6 @@
 ---
 {
-"title": "ALTER-WORKLOAD -GROUP",
+"title": "ALTER-WORKLOAD-GROUP",
 "language": "zh-CN"
 }
 ---
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ALTER-WORKLOAD -GROUP
+## ALTER-WORKLOAD-GROUP
 
 ### Name
 
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 5950548c427..74896554b7e 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -296,6 +296,7 @@ terminal String
     KW_CLUSTERS,
     KW_COLLATE,
     KW_COLLATION,
+    KW_COLOCATE,
     KW_COLUMN,
     KW_COLUMNS,
     KW_COMMENT,
@@ -798,6 +799,7 @@ nonterminal Qualifier opt_set_qualifier;
 nonterminal Operation set_op;
 nonterminal ArrayList<String> opt_common_hints;
 nonterminal String optional_on_ident;
+nonterminal ColocateGroupName colocate_group_name;
 
 nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
 
@@ -1327,6 +1329,10 @@ alter_stmt ::=
     {:
         RESULT = new AlterResourceStmt(resourceName, properties);
     :}
+    | KW_ALTER KW_COLOCATE KW_GROUP colocate_group_name:colocateGroupName 
KW_SET LPAREN key_value_map:properties RPAREN
+    {:
+        RESULT = new AlterColocateGroupStmt(colocateGroupName, properties);
+    :}
     | KW_ALTER KW_WORKLOAD KW_GROUP ident_or_text:workloadGroupName 
opt_properties:properties
     {:
         RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties);
@@ -5521,6 +5527,17 @@ table_name ::=
     {: RESULT = new TableName(ctl, db, tbl); :}
     ;
 
+colocate_group_name ::=
+    ident:group
+    {:
+        RESULT = new ColocateGroupName(null, group);
+    :}
+    | ident:db DOT ident:group
+    {:
+        RESULT = new ColocateGroupName(db, group);
+    :}
+    ;
+
 encryptkey_name ::=
     ident:name
     {:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
new file mode 100644
index 00000000000..e268322dcc8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+import java.util.Map;
+
+public class AlterColocateGroupStmt extends DdlStmt {
+    private final ColocateGroupName colocateGroupName;
+    private final Map<String, String> properties;
+
+    public AlterColocateGroupStmt(ColocateGroupName colocateGroupName, 
Map<String, String> properties) {
+        this.colocateGroupName = colocateGroupName;
+        this.properties = properties;
+    }
+
+    public ColocateGroupName getColocateGroupName() {
+        return colocateGroupName;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        colocateGroupName.analyze(analyzer);
+
+        String dbName = colocateGroupName.getDb();
+        if (Strings.isNullOrEmpty(dbName)) {
+            if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(
+                        ConnectContext.get(), PrivPredicate.ADMIN)) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+            }
+        } else {
+            if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(
+                        ConnectContext.get(), dbName, PrivPredicate.ADMIN)) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR,
+                        ConnectContext.get().getQualifiedUser(), dbName);
+            }
+        }
+
+        if (properties == null || properties.isEmpty()) {
+            throw new AnalysisException("Colocate group properties can't be 
null");
+        }
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ALTER COLOCATE GROUP 
").append(colocateGroupName.toSql()).append(" ");
+        sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", 
true, false)).append(")");
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
new file mode 100644
index 00000000000..b7f0c0afd34
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+
+import com.google.common.base.Strings;
+
+public class ColocateGroupName {
+    private String db;
+    private String group;
+
+    public ColocateGroupName(String db, String group) {
+        this.db = db;
+        this.group = group;
+    }
+
+    public String getDb() {
+        return db;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+        if (GroupId.isGlobalGroupName(group)) {
+            if (!Strings.isNullOrEmpty(db)) {
+                throw new AnalysisException("group that name starts with `" + 
GroupId.GLOBAL_COLOCATE_PREFIX + "`"
+                        + " is a global group, it doesn't belong to any 
specific database");
+            }
+        } else {
+            if (Strings.isNullOrEmpty(db)) {
+                if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+                    
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+                }
+                db = analyzer.getDefaultDb();
+            }
+            db = ClusterNamespace.getFullName(analyzer.getClusterName(), db);
+        }
+    }
+
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (!Strings.isNullOrEmpty(db)) {
+            sb.append("`").append(db).append("`.");
+        }
+        sb.append("`").append(group).append("`");
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
index b5004973c37..57d512b9789 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
@@ -66,6 +66,10 @@ public class ColocateGroupSchema implements Writable {
         return replicaAlloc;
     }
 
+    public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+        this.replicaAlloc = replicaAlloc;
+    }
+
     public List<Type> getDistributionColTypes() {
         return distributionColTypes;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 23703278fd8..fcefcff132a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -17,10 +17,16 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.analysis.AlterColocateGroupStmt;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -249,10 +255,34 @@ public class ColocateTableIndex implements Writable {
         }
     }
 
-    public void addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, 
List<List<Long>> backendsPerBucketSeq) {
+    public void setBackendsPerBucketSeq(GroupId groupId, Map<Tag, 
List<List<Long>>> backendsPerBucketSeq) {
         writeLock();
         try {
+            Map<Tag, List<List<Long>>> backendsPerBucketSeqMap = 
group2BackendsPerBucketSeq.row(groupId);
+            if (backendsPerBucketSeqMap != null) {
+                backendsPerBucketSeqMap.clear();
+            }
+            for (Map.Entry<Tag, List<List<Long>>> entry : 
backendsPerBucketSeq.entrySet()) {
+                group2BackendsPerBucketSeq.put(groupId, entry.getKey(), 
entry.getValue());
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public boolean addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, 
List<List<Long>> backendsPerBucketSeq,
+            ReplicaAllocation originReplicaAlloc) {
+        writeLock();
+        try {
+            ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+            // replica allocation has outdate
+            if (groupSchema != null && 
!originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+                LOG.info("replica allocation has outdate for group {}, old 
replica alloc {}, new replica alloc {}",
+                        groupId, originReplicaAlloc.getAllocMap(), 
groupSchema.getReplicaAlloc());
+                return false;
+            }
             group2BackendsPerBucketSeq.put(groupId, tag, backendsPerBucketSeq);
+            return true;
         } finally {
             writeUnlock();
         }
@@ -277,12 +307,20 @@ public class ColocateTableIndex implements Writable {
         }
     }
 
-    public void markGroupStable(GroupId groupId, boolean needEditLog) {
+    public void markGroupStable(GroupId groupId, boolean needEditLog, 
ReplicaAllocation originReplicaAlloc) {
         writeLock();
         try {
             if (!group2Tables.containsKey(groupId)) {
                 return;
             }
+            // replica allocation is outdate
+            ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+            if (groupSchema != null && originReplicaAlloc != null
+                    && 
!originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+                LOG.warn("mark group {} failed, replica alloc has outdate, old 
replica alloc {}, new replica alloc {}",
+                        groupId, originReplicaAlloc.getAllocMap(), 
groupSchema.getReplicaAlloc());
+                return;
+            }
             if (unstableGroups.remove(groupId)) {
                 group2ErrMsgs.put(groupId, "");
                 if (needEditLog) {
@@ -604,13 +642,23 @@ public class ColocateTableIndex implements Writable {
     }
 
     public void replayMarkGroupStable(ColocatePersistInfo info) {
-        markGroupStable(info.getGroupId(), false);
+        markGroupStable(info.getGroupId(), false, null);
     }
 
     public void replayRemoveTable(ColocatePersistInfo info) {
         removeTable(info.getTableId());
     }
 
+    public void replayModifyReplicaAlloc(ColocatePersistInfo info) throws 
UserException {
+        writeLock();
+        try {
+            modifyColocateGroupReplicaAllocation(info.getGroupId(), 
info.getReplicaAlloc(),
+                    info.getBackendsPerBucketSeq(), false);
+        } finally {
+            writeUnlock();
+        }
+    }
+
     // only for test
     public void clear() {
         writeLock();
@@ -633,7 +681,22 @@ public class ColocateTableIndex implements Writable {
                 List<String> info = Lists.newArrayList();
                 GroupId groupId = entry.getValue();
                 info.add(groupId.toString());
-                info.add(entry.getKey());
+                String dbName = "";
+                if (groupId.dbId != 0) {
+                    Database db = 
Env.getCurrentInternalCatalog().getDbNullable(groupId.dbId);
+                    if (db != null) {
+                        dbName = db.getFullName();
+                        int index = dbName.indexOf(":");
+                        if (index > 0) {
+                            dbName = dbName.substring(index + 1); //use short 
db name
+                        }
+                    }
+                }
+                String groupName = entry.getKey();
+                if (!GroupId.isGlobalGroupName(groupName)) {
+                    groupName = dbName + "." + 
groupName.substring(groupName.indexOf("_") + 1);
+                }
+                info.add(groupName);
                 info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
                 ColocateGroupSchema groupSchema = group2Schema.get(groupId);
                 info.add(String.valueOf(groupSchema.getBucketsNum()));
@@ -756,4 +819,124 @@ public class ColocateTableIndex implements Writable {
     public Map<Long, GroupId> getTable2Group() {
         return table2Group;
     }
+
+    public void alterColocateGroup(AlterColocateGroupStmt stmt) throws 
UserException {
+        writeLock();
+        try {
+            Map<String, String> properties = stmt.getProperties();
+            String dbName = stmt.getColocateGroupName().getDb();
+            String groupName = stmt.getColocateGroupName().getGroup();
+            long dbId = 0;
+            if (!GroupId.isGlobalGroupName(groupName)) {
+                Database db = (Database) 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
+                dbId = db.getId();
+            }
+            String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
+            ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
+            if (groupSchema == null) {
+                throw new DdlException("Not found colocate group " + 
stmt.getColocateGroupName().toSql());
+            }
+
+            GroupId groupId = groupSchema.getGroupId();
+
+            if (properties.size() > 1) {
+                throw new DdlException("Can only set one colocate group 
property at a time");
+            }
+
+            if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
+                    || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
+                ReplicaAllocation replicaAlloc = 
PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
+                Preconditions.checkState(!replicaAlloc.isNotSet());
+                
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+                Map<Tag, List<List<Long>>> backendsPerBucketSeq = 
getBackendsPerBucketSeq(groupId);
+                Map<Tag, List<List<Long>>> newBackendsPerBucketSeq = 
Maps.newHashMap();
+                for (Map.Entry<Tag, List<List<Long>>> entry : 
backendsPerBucketSeq.entrySet()) {
+                    List<List<Long>> newList = Lists.newArrayList();
+                    for (List<Long> backends : entry.getValue()) {
+                        newList.add(Lists.newArrayList(backends));
+                    }
+                    newBackendsPerBucketSeq.put(entry.getKey(), newList);
+                }
+                try {
+                    
ColocateTableCheckerAndBalancer.modifyGroupReplicaAllocation(replicaAlloc,
+                            newBackendsPerBucketSeq, 
groupSchema.getBucketsNum());
+                } catch (Exception e) {
+                    LOG.warn("modify group [{}, {}] to replication allocation 
{} failed, bucket seq {}",
+                            fullGroupName, groupId, replicaAlloc, 
backendsPerBucketSeq, e);
+                    throw new DdlException(e.getMessage());
+                }
+                backendsPerBucketSeq = newBackendsPerBucketSeq;
+                Preconditions.checkState(backendsPerBucketSeq.size() == 
replicaAlloc.getAllocMap().size());
+                modifyColocateGroupReplicaAllocation(groupSchema.getGroupId(), 
replicaAlloc,
+                        backendsPerBucketSeq, true);
+            } else {
+                throw new DdlException("Unknown colocate group property: " + 
properties.keySet());
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void modifyColocateGroupReplicaAllocation(GroupId groupId, 
ReplicaAllocation replicaAlloc,
+            Map<Tag, List<List<Long>>> backendsPerBucketSeq, boolean 
needEditLog) throws UserException {
+        ColocateGroupSchema groupSchema = getGroupSchema(groupId);
+        if (groupSchema == null) {
+            LOG.warn("not found group {}", groupId);
+            return;
+        }
+
+        List<Long> tableIds = getAllTableIds(groupId);
+        for (Long tableId : tableIds) {
+            long dbId = groupId.dbId;
+            if (dbId == 0) {
+                dbId = groupId.getDbIdByTblId(tableId);
+            }
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            if (table == null || !isColocateTable(table.getId())) {
+                continue;
+            }
+            table.writeLock();
+            try {
+                Map<String, String> tblProperties = Maps.newHashMap();
+                tblProperties.put("default." + 
PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
+                        replicaAlloc.toCreateStmt());
+                table.setReplicaAllocation(tblProperties);
+                if (table.dynamicPartitionExists()) {
+                    TableProperty tableProperty = table.getTableProperty();
+                    // Merge the new properties with origin properties, and 
then analyze them
+                    Map<String, String> origDynamicProperties = 
tableProperty.getOriginDynamicPartitionProperty();
+                    
origDynamicProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION,
+                            replicaAlloc.toCreateStmt());
+                    Map<String, String> analyzedDynamicPartition = 
DynamicPartitionUtil.analyzeDynamicPartition(
+                            origDynamicProperties, table, db);
+                    
tableProperty.modifyTableProperties(analyzedDynamicPartition);
+                    tableProperty.buildDynamicProperty();
+                }
+                for (ReplicaAllocation alloc : 
table.getPartitionInfo().getPartitionReplicaAllocations().values()) {
+                    Map<Tag, Short> allocMap = alloc.getAllocMap();
+                    allocMap.clear();
+                    allocMap.putAll(replicaAlloc.getAllocMap());
+                }
+            } finally {
+                table.writeUnlock();
+            }
+        }
+
+        if 
(!backendsPerBucketSeq.equals(group2BackendsPerBucketSeq.row(groupId))) {
+            markGroupUnstable(groupId, "change replica allocation", false);
+        }
+        groupSchema.setReplicaAlloc(replicaAlloc);
+        setBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+
+        if (needEditLog) {
+            ColocatePersistInfo info = 
ColocatePersistInfo.createForModifyReplicaAlloc(groupId,
+                    replicaAlloc, backendsPerBucketSeq);
+            
Env.getCurrentEnv().getEditLog().logColocateModifyRepliaAlloc(info);
+        }
+        LOG.info("modify group {} replication allocation to {}, is replay {}", 
groupId, replicaAlloc, !needEditLog);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 6bd4604471a..235a8cc8db5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -240,6 +240,10 @@ public class PartitionInfo implements Writable {
         idToStoragePolicy.put(partitionId, storagePolicy);
     }
 
+    public Map<Long, ReplicaAllocation> getPartitionReplicaAllocations() {
+        return idToReplicaAllocation;
+    }
+
     public ReplicaAllocation getReplicaAllocation(long partitionId) {
         if (!idToReplicaAllocation.containsKey(partitionId)) {
             LOG.debug("failed to get replica allocation for partition: {}", 
partitionId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 5c18c2bd468..4ec8993be0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -35,6 +35,7 @@ import org.apache.doris.clone.TabletScheduler.AddResult;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.resource.Tag;
@@ -183,7 +184,12 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 List<List<Long>> balancedBackendsPerBucketSeq = 
Lists.newArrayList();
                 if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, 
availableBeIds, colocateIndex,
                         infoService, statistic, balancedBackendsPerBucketSeq)) 
{
-                    colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, 
balancedBackendsPerBucketSeq);
+                    if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, 
tag, balancedBackendsPerBucketSeq,
+                                replicaAlloc)) {
+                        LOG.warn("relocate group {} succ, but replica 
allocation has change, old replica alloc {}",
+                                groupId, replicaAlloc);
+                        continue;
+                    }
                     Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap 
= Maps.newHashMap();
                     balancedBackendsPerBucketSeqMap.put(tag, 
balancedBackendsPerBucketSeq);
                     ColocatePersistInfo info = ColocatePersistInfo
@@ -219,6 +225,8 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 continue;
             }
 
+            ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
             String unstableReason = null;
             OUT:
             for (Long tableId : tableIds) {
@@ -237,8 +245,6 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 olapTable.readLock();
                 try {
                     for (Partition partition : olapTable.getPartitions()) {
-                        ReplicaAllocation replicaAlloc
-                                = 
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
                         short replicationNum = 
replicaAlloc.getTotalReplicaNum();
                         long visibleVersion = partition.getVisibleVersion();
                         // Here we only get VISIBLE indexes. All other indexes 
are not queryable.
@@ -269,8 +275,7 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                                     TabletSchedCtx tabletCtx = new 
TabletSchedCtx(
                                             TabletSchedCtx.Type.REPAIR,
                                             db.getId(), tableId, 
partition.getId(), index.getId(), tablet.getId(),
-                                            
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()),
-                                            System.currentTimeMillis());
+                                            replicaAlloc, 
System.currentTimeMillis());
                                     // the tablet status will be set again 
when being scheduled
                                     tabletCtx.setTabletStatus(st);
                                     tabletCtx.setPriority(Priority.NORMAL);
@@ -299,7 +304,7 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
 
             // mark group as stable or unstable
             if (Strings.isNullOrEmpty(unstableReason)) {
-                colocateIndex.markGroupStable(groupId, true);
+                colocateIndex.markGroupStable(groupId, true, replicaAlloc);
             } else {
                 colocateIndex.markGroupUnstable(groupId, unstableReason, true);
             }
@@ -521,6 +526,122 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         return hostsPerBucketSeq;
     }
 
+    public static void modifyGroupReplicaAllocation(ReplicaAllocation 
replicaAlloc,
+            Map<Tag, List<List<Long>>> backendBucketsSeq, int bucketNum) 
throws Exception {
+        Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
+        List<Tag> deleteTags = Lists.newArrayList();
+        for (Tag tag : backendBucketsSeq.keySet()) {
+            if (!allocMap.containsKey(tag)) {
+                deleteTags.add(tag);
+            }
+            Preconditions.checkState(bucketNum == 
backendBucketsSeq.get(tag).size(),
+                    bucketNum + " vs " + backendBucketsSeq.get(tag).size());
+        }
+        deleteTags.forEach(tag -> backendBucketsSeq.remove(tag));
+
+        for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
+            if (!backendBucketsSeq.containsKey(tag)) {
+                List<List<Long>> tagBackendBucketsSeq = Lists.newArrayList();
+                for (int i = 0; i < bucketNum; i++) {
+                    tagBackendBucketsSeq.add(Lists.newArrayList());
+                }
+                backendBucketsSeq.put(tag, tagBackendBucketsSeq);
+            }
+        }
+
+        Map<Long, Integer> backendToBucketNum = Maps.newHashMap();
+        backendBucketsSeq.values().forEach(tagBackendIds ->
+                tagBackendIds.forEach(backendIds ->
+                        backendIds.forEach(backendId -> backendToBucketNum.put(
+                                backendId, 
backendToBucketNum.getOrDefault(backendId, 0) + 1))));
+
+        for (Tag tag : backendBucketsSeq.keySet()) {
+            List<List<Long>> tagBackendBucketsSeq = backendBucketsSeq.get(tag);
+            int oldReplicaNum = tagBackendBucketsSeq.get(0).size();
+            for (List<Long> backendIdsOneBucket : tagBackendBucketsSeq) {
+                Preconditions.checkState(backendIdsOneBucket.size() == 
oldReplicaNum,
+                        backendIdsOneBucket.size() + " vs " + oldReplicaNum);
+            }
+
+            int newReplicaNum = allocMap.get(tag);
+            if (newReplicaNum == oldReplicaNum) {
+                continue;
+            }
+
+            List<Backend> backends = 
Env.getCurrentSystemInfo().getBackendsByTag(tag);
+            Set<Long> availableBeIds = backends.stream().filter(be -> 
be.isScheduleAvailable())
+                    .map(be -> be.getId()).collect(Collectors.toSet());
+
+            for (Long backendId : availableBeIds) {
+                if (!backendToBucketNum.containsKey(backendId)) {
+                    backendToBucketNum.put(backendId, 0);
+                }
+            }
+
+            for (int i = 0; i < tagBackendBucketsSeq.size(); i++) {
+                modifyGroupBucketReplicas(tag, newReplicaNum, 
tagBackendBucketsSeq.get(i),
+                        availableBeIds, backendToBucketNum);
+            }
+        }
+    }
+
+    private static void modifyGroupBucketReplicas(Tag tag, int newReplicaNum, 
List<Long> backendIds,
+            Set<Long> availableBeIds, Map<Long, Integer> backendToBucketNum) 
throws Exception {
+        final boolean smallIdFirst = Math.random() < 0.5;
+        if (backendIds.size() > newReplicaNum) {
+            backendIds.sort((id1, id2) -> {
+                boolean alive1 = availableBeIds.contains(id1);
+                boolean alive2 = availableBeIds.contains(id2);
+                if (alive1 != alive2) {
+                    return alive1 ? -1 : 1;
+                }
+                int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+                int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+                if (bucketNum1 != bucketNum2) {
+                    return Integer.compare(bucketNum1, bucketNum2);
+                }
+
+                return smallIdFirst ? Long.compare(id1, id2) : 
Long.compare(id2, id1);
+            });
+
+            for (int i = backendIds.size() - 1; i >= newReplicaNum; i--) {
+                long backendId = backendIds.get(i);
+                backendIds.remove(i);
+                backendToBucketNum.put(backendId, 
backendToBucketNum.getOrDefault(backendId, 0) - 1);
+            }
+        }
+
+        if (backendIds.size() < newReplicaNum) {
+            Set<Long> candBackendSet = Sets.newHashSet();
+            candBackendSet.addAll(availableBeIds);
+            candBackendSet.removeAll(backendIds);
+            if (backendIds.size() + candBackendSet.size() < newReplicaNum) {
+                throw new UserException("Can not add backend for tag: " + tag);
+            }
+
+            List<Long> candBackendList = Lists.newArrayList(candBackendSet);
+            candBackendList.sort((id1, id2) -> {
+                int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+                int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+                if (bucketNum1 != bucketNum2) {
+                    return Integer.compare(bucketNum1, bucketNum2);
+                }
+
+                return smallIdFirst ? Long.compare(id1, id2) : 
Long.compare(id2, id1);
+            });
+
+            int addNum = newReplicaNum - backendIds.size();
+            for (int i = 0; i < addNum; i++) {
+                long backendId = candBackendList.get(i);
+                backendIds.add(backendId);
+                backendToBucketNum.put(backendId, 
backendToBucketNum.getOrDefault(backendId, 0) + 1);
+            }
+        }
+
+        Preconditions.checkState(newReplicaNum == backendIds.size(),
+                newReplicaNum + " vs " + backendIds.size());
+    }
+
     private List<Map.Entry<Long, Long>> 
getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
             Set<Long> unavailBackendIds, LoadStatisticForTag statistic, 
List<Long> flatBackendsPerBucketSeq) {
         // backend id -> replica num, and sorted by replica num, descending.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index d6a8e1efa0e..ee9da3ac100 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
 
 import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
 import org.apache.doris.analysis.AdminRebalanceDiskStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Database;
@@ -490,15 +491,20 @@ public class TabletScheduler extends MasterDaemon {
                 throw new SchedException(Status.UNRECOVERABLE, "index does not 
exist");
             }
 
+            ReplicaAllocation replicaAlloc = null;
             Tablet tablet = idx.getTablet(tabletId);
             Preconditions.checkNotNull(tablet);
-            ReplicaAllocation replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
-
             if (isColocateTable) {
                 GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
                 if (groupId == null) {
                     throw new SchedException(Status.UNRECOVERABLE, "colocate 
group does not exist");
                 }
+                ColocateGroupSchema groupSchema = 
colocateTableIndex.getGroupSchema(groupId);
+                if (groupSchema == null) {
+                    throw new SchedException(Status.UNRECOVERABLE,
+                            "colocate group schema " + groupId + " does not 
exist");
+                }
+                replicaAlloc = groupSchema.getReplicaAlloc();
 
                 int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
                 if (tabletOrderIdx == -1) {
@@ -512,6 +518,7 @@ public class TabletScheduler extends MasterDaemon {
                 statusPair = Pair.of(st, Priority.HIGH);
                 tabletCtx.setColocateGroupBackendIds(backendsSet);
             } else {
+                replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
                 List<Long> aliveBeIds = infoService.getAllBackendIds(true);
                 statusPair = tablet.getHealthStatusWithPriority(
                         infoService, partition.getVisibleVersion(), 
replicaAlloc, aliveBeIds);
@@ -1484,14 +1491,18 @@ public class TabletScheduler extends MasterDaemon {
                 return;
             }
 
-            replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
             boolean isColocateTable = 
colocateTableIndex.isColocateTable(tbl.getId());
             if (isColocateTable) {
                 GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
                 if (groupId == null) {
                     return;
                 }
+                ColocateGroupSchema groupSchema = 
colocateTableIndex.getGroupSchema(groupId);
+                if (groupSchema == null) {
+                    return;
+                }
 
+                replicaAlloc = groupSchema.getReplicaAlloc();
                 int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
                 if (tabletOrderIdx == -1) {
                     tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
@@ -1504,6 +1515,7 @@ public class TabletScheduler extends MasterDaemon {
                 statusPair = Pair.of(st, Priority.HIGH);
                 tabletCtx.setColocateGroupBackendIds(backendsSet);
             } else {
+                replicaAlloc = 
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
                 List<Long> aliveBeIds = infoService.getAllBackendIds(true);
                 statusPair = tablet.getHealthStatusWithPriority(
                         infoService, partition.getVisibleVersion(), 
replicaAlloc, aliveBeIds);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index 93f54483cbf..3ce3ff74c7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.proc;
 
+import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -185,6 +186,10 @@ public class TabletHealthProcDir implements 
ProcDirInterface {
                                 ++tabletNum;
                                 Tablet.TabletStatus res = null;
                                 if (groupId != null) {
+                                    ColocateGroupSchema groupSchema = 
colocateTableIndex.getGroupSchema(groupId);
+                                    if (groupSchema != null) {
+                                        replicaAlloc = 
groupSchema.getReplicaAlloc();
+                                    }
                                     Set<Long> backendsSet = 
colocateTableIndex.getTabletBackendsByGroup(groupId, i);
                                     res = 
tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
                                             backendsSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
index 9e51d38de5c..b7c2a615aac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
@@ -114,7 +114,7 @@ public class ColocateMetaService extends RestBaseController 
{
         if ("POST".equalsIgnoreCase(method)) {
             colocateIndex.markGroupUnstable(groupId, "mark unstable via http 
api", true);
         } else if ("DELETE".equalsIgnoreCase(method)) {
-            colocateIndex.markGroupStable(groupId, true);
+            colocateIndex.markGroupStable(groupId, true, null);
         }
         return ResponseEntityBuilder.ok();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0e0b8ac7df6..d3d0fe18d90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -456,6 +456,7 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC:
             case OperationType.OP_COLOCATE_ADD_TABLE:
             case OperationType.OP_COLOCATE_REMOVE_TABLE:
             case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index b78cbddb381..bfe1bb0a9e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.master;
 
 
 import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -1172,6 +1173,10 @@ public class ReportHandler extends Daemon {
                 int tabletOrderIdx = 
materializedIndex.getTabletOrderIdx(tabletId);
                 Preconditions.checkState(tabletOrderIdx != -1, "get tablet 
materializedIndex for %s fail", tabletId);
                 Set<Long> backendsSet = 
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+                ColocateGroupSchema groupSchema = 
colocateTableIndex.getGroupSchema(groupId);
+                if (groupSchema != null) {
+                    replicaAlloc = groupSchema.getReplicaAlloc();
+                }
                 TabletStatus status =
                         tablet.getColocateHealthStatus(visibleVersion, 
replicaAlloc, backendsSet);
                 if (status == TabletStatus.HEALTHY) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
index 459be646052..429d4e0e1a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
@@ -18,6 +18,7 @@
 package org.apache.doris.persist;
 
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -45,29 +46,38 @@ public class ColocatePersistInfo implements Writable {
     private long tableId;
     @SerializedName(value = "backendsPerBucketSeq")
     private Map<Tag, List<List<Long>>> backendsPerBucketSeq = 
Maps.newHashMap();
+    @SerializedName(value = "replicaAlloc")
+    private ReplicaAllocation replicaAlloc = new ReplicaAllocation();
 
-    private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, 
List<List<Long>>> backendsPerBucketSeq) {
+    private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, 
List<List<Long>>> backendsPerBucketSeq,
+            ReplicaAllocation replicaAlloc) {
         this.groupId = groupId;
         this.tableId = tableId;
         this.backendsPerBucketSeq = backendsPerBucketSeq;
+        this.replicaAlloc = replicaAlloc;
     }
 
     public static ColocatePersistInfo createForAddTable(GroupId groupId, long 
tableId,
             Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
-        return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq);
+        return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq, 
new ReplicaAllocation());
     }
 
     public static ColocatePersistInfo createForBackendsPerBucketSeq(GroupId 
groupId,
             Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
-        return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq);
+        return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, new 
ReplicaAllocation());
     }
 
     public static ColocatePersistInfo createForMarkUnstable(GroupId groupId) {
-        return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+        return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new 
ReplicaAllocation());
     }
 
     public static ColocatePersistInfo createForMarkStable(GroupId groupId) {
-        return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+        return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new 
ReplicaAllocation());
+    }
+
+    public static ColocatePersistInfo createForModifyReplicaAlloc(GroupId 
groupId, ReplicaAllocation replicaAlloc,
+            Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
+        return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, 
replicaAlloc);
     }
 
     public static ColocatePersistInfo read(DataInput in) throws IOException {
@@ -87,6 +97,10 @@ public class ColocatePersistInfo implements Writable {
         return backendsPerBucketSeq;
     }
 
+    public ReplicaAllocation getReplicaAlloc() {
+        return replicaAlloc;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -129,7 +143,7 @@ public class ColocatePersistInfo implements Writable {
         ColocatePersistInfo info = (ColocatePersistInfo) obj;
 
         return tableId == info.tableId && groupId.equals(info.groupId) && 
backendsPerBucketSeq.equals(
-                info.backendsPerBucketSeq);
+                info.backendsPerBucketSeq) && 
replicaAlloc.equals(info.replicaAlloc);
     }
 
     @Override
@@ -138,6 +152,7 @@ public class ColocatePersistInfo implements Writable {
         sb.append("table id: ").append(tableId);
         sb.append(" group id: ").append(groupId);
         sb.append(" backendsPerBucketSeq: ").append(backendsPerBucketSeq);
+        sb.append(" replicaAlloc: ").append(replicaAlloc);
         return sb.toString();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 97dd6719e9b..b0540abf9ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -594,6 +594,11 @@ public class EditLog {
                     env.getColocateTableIndex().replayMarkGroupStable(info);
                     break;
                 }
+                case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    env.getColocateTableIndex().replayModifyReplicaAlloc(info);
+                    break;
+                }
                 case OperationType.OP_MODIFY_TABLE_COLOCATE: {
                     final TablePropertyInfo info = (TablePropertyInfo) 
journal.getData();
                     env.replayModifyTableColocate(info);
@@ -1497,6 +1502,10 @@ public class EditLog {
         Env.getCurrentEnv().getBinlogManager().addTruncateTable(info, logId);
     }
 
+    public void logColocateModifyRepliaAlloc(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC, info);
+    }
+
     public void logColocateAddTable(ColocatePersistInfo info) {
         logEdit(OperationType.OP_COLOCATE_ADD_TABLE, info);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 3cf30df2428..b2f3ffb7e33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -186,6 +186,10 @@ public class OperationType {
     public static final short OP_ADD_GLOBAL_FUNCTION = 132;
     public static final short OP_DROP_GLOBAL_FUNCTION = 133;
 
+    // modify database/table/tablet/replica meta
+    public static final short OP_SET_REPLICA_VERSION = 141;
+    public static final short OP_COLOCATE_MOD_REPLICA_ALLOC = 142;
+
     // routine load 200
     public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
     public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 29e29a9a7dd..f50d8d1ae02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
 import org.apache.doris.analysis.AlterCatalogCommentStmt;
 import org.apache.doris.analysis.AlterCatalogNameStmt;
 import org.apache.doris.analysis.AlterCatalogPropertyStmt;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
 import org.apache.doris.analysis.AlterColumnStatsStmt;
 import org.apache.doris.analysis.AlterDatabasePropertyStmt;
 import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
@@ -298,6 +299,8 @@ public class DdlExecutor {
             env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
         } else if (ddlStmt instanceof AlterResourceStmt) {
             env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
+        } else if (ddlStmt instanceof AlterColocateGroupStmt) {
+            
env.getColocateTableIndex().alterColocateGroup((AlterColocateGroupStmt) 
ddlStmt);
         } else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
             
env.getWorkloadGroupMgr().alterWorkloadGroup((AlterWorkloadGroupStmt) ddlStmt);
         } else if (ddlStmt instanceof CreatePolicyStmt) {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 4f4f3392dc2..c4791093b48 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -146,6 +146,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS));
         keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE));
         keywordMap.put("collation", new 
Integer(SqlParserSymbols.KW_COLLATION));
+        keywordMap.put("colocate", new Integer(SqlParserSymbols.KW_COLOCATE));
         keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN));
         keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS));
         keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
index 924fcd53b7d..4a78ef4de28 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.analysis.AlterColocateGroupStmt;
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
@@ -26,6 +27,8 @@ import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DateLiteral;
 import org.apache.doris.analysis.DropResourceStmt;
 import org.apache.doris.analysis.ShowCreateMaterializedViewStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
@@ -36,10 +39,13 @@ import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.clone.RebalancerTestUtil;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -47,6 +53,7 @@ import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.ShowExecutor;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
@@ -70,18 +77,36 @@ public class AlterTest {
     private static String runningDir = "fe/mocked/AlterTest/" + 
UUID.randomUUID().toString() + "/";
 
     private static ConnectContext connectContext;
-    private static Backend be;
+
+    private static Map<Long, Tag> backendTags;
 
     @BeforeClass
     public static void beforeClass() throws Exception {
         FeConstants.runningUnitTest = true;
         FeConstants.default_scheduler_interval_millisecond = 100;
+        FeConstants.tablet_checker_interval_ms = 100;
+        FeConstants.tablet_checker_interval_ms = 100;
         Config.dynamic_partition_check_interval_seconds = 1;
         Config.disable_storage_medium_check = true;
         Config.enable_storage_policy = true;
-        UtFrameUtils.createDorisCluster(runningDir);
+        Config.disable_balance = true;
+        Config.schedule_batch_size = 400;
+        Config.schedule_slot_num_per_hdd_path = 100;
+        UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5);
+
+        List<Backend> backends = 
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+
+        Map<String, String> tagMap = Maps.newHashMap();
+        tagMap.put(Tag.TYPE_LOCATION, "group_a");
+        backends.get(2).setTagMap(tagMap);
+        backends.get(3).setTagMap(tagMap);
+
+        tagMap = Maps.newHashMap();
+        tagMap.put(Tag.TYPE_LOCATION, "group_b");
+        backends.get(4).setTagMap(tagMap);
 
-        be = 
Env.getCurrentSystemInfo().getIdToBackend().values().asList().get(0);
+        backendTags = Maps.newHashMap();
+        backends.forEach(be -> backendTags.put(be.getId(), 
be.getLocationTag()));
 
         // create connect context
         connectContext = UtFrameUtils.createDefaultCtx();
@@ -443,21 +468,16 @@ public class AlterTest {
 
         // set un-partitioned table's real replication num
         // first we need to change be's tag
-        Map<String, String> originTagMap = be.getTagMap();
-        Map<String, String> tagMap = Maps.newHashMap();
-        tagMap.put(Tag.TYPE_LOCATION, "group1");
-        be.setTagMap(tagMap);
         OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("tbl2");
         Partition partition = tbl2.getPartition(tbl2.getName());
         Assert.assertEquals(Short.valueOf("1"),
                 
Short.valueOf(tbl2.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()));
-        stmt = "alter table test.tbl2 set ('replication_allocation' = 
'tag.location.group1:1');";
+        stmt = "alter table test.tbl2 set ('replication_allocation' = 
'tag.location.group_a:1');";
         alterTable(stmt, false);
         Assert.assertEquals((short) 1, (short) 
tbl2.getPartitionInfo().getReplicaAllocation(partition.getId())
-                .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, 
"group1")));
+                .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, 
"group_a")));
         Assert.assertEquals((short) 1, (short) 
tbl2.getTableProperty().getReplicaAllocation()
-                .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, 
"group1")));
-        be.setTagMap(originTagMap);
+                .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, 
"group_a")));
 
         Thread.sleep(5000); // sleep to wait dynamic partition scheduler run
         // add partition without set replication num, and default num is 3.
@@ -1239,6 +1259,145 @@ public class AlterTest {
         Env.getCurrentEnv().getResourceMgr().dropResource(stmt);
     }
 
+    @Test
+    public void testModifyColocateGroupReplicaAlloc() throws Exception {
+        Config.enable_round_robin_create_tablet = true;
+
+        createTable("CREATE TABLE test.col_tbl0\n" + "(\n" + "    k1 date,\n" 
+ "    k2 int,\n" + "    v1 int \n"
+                + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n"
+                + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+                + "PROPERTIES('replication_num' = '2', 'colocate_with' = 
'mod_group_0');");
+
+        createTable("CREATE TABLE test.col_tbl1\n" + "(\n" + "    k1 date,\n" 
+ "    k2 int,\n" + "    v1 int \n"
+                + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n" + "PARTITION BY 
RANGE(k1)\n" + "(\n"
+                + "    PARTITION p1 values less than('2020-02-01'),\n"
+                + "    PARTITION p2 values less than('2020-03-01')\n" + ")\n" 
+ "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+                + "PROPERTIES('replication_num' = '2', 'colocate_with' = 
'mod_group_1');");
+
+        createTable("CREATE TABLE test.col_tbl2 (\n"
+                + "`uuid` varchar(255) NULL,\n"
+                + "`action_datetime` date NULL\n"
+                + ")\n"
+                + "DUPLICATE KEY(uuid)\n"
+                + "PARTITION BY RANGE(action_datetime)()\n"
+                + "DISTRIBUTED BY HASH(uuid) BUCKETS 4\n"
+                + "PROPERTIES\n"
+                + "(\n"
+                + "\"replication_num\" = \"2\",\n"
+                + "\"colocate_with\" = \"mod_group_2\",\n"
+                + "\"dynamic_partition.enable\" = \"true\",\n"
+                + "\"dynamic_partition.time_unit\" = \"DAY\",\n"
+                + "\"dynamic_partition.end\" = \"2\",\n"
+                + "\"dynamic_partition.prefix\" = \"p\",\n"
+                + "\"dynamic_partition.buckets\" = \"4\",\n"
+                + "\"dynamic_partition.replication_num\" = \"2\"\n"
+                + ");\n");
+
+        Env env = Env.getCurrentEnv();
+        Database db = 
env.getInternalCatalog().getDbOrMetaException("default_cluster:test");
+        OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("col_tbl2");
+        for (int j = 0; true; j++) {
+            Thread.sleep(2000);
+            if (tbl2.getAllPartitions().size() > 0) {
+                break;
+            }
+            if (j >= 5) {
+                Assert.assertTrue("dynamic table not create partition", false);
+            }
+        }
+
+        RebalancerTestUtil.updateReplicaPathHash();
+
+        ReplicaAllocation newReplicaAlloc = new ReplicaAllocation();
+        newReplicaAlloc.put(Tag.DEFAULT_BACKEND_TAG, (short) 1);
+        newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_a"), (short) 
1);
+        newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_b"), (short) 
1);
+
+        for (int i = 0; i < 3; i++) {
+            String groupName = "mod_group_" + i;
+            String sql = "alter colocate group test." + groupName
+                    + " set ( 'replication_allocation' = '" + 
newReplicaAlloc.toCreateStmt() + "')";
+            String fullGroupName = GroupId.getFullGroupName(db.getId(), 
groupName);
+            AlterColocateGroupStmt stmt = (AlterColocateGroupStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+            DdlExecutor.execute(env, stmt);
+
+            ColocateGroupSchema groupSchema = 
env.getColocateTableIndex().getGroupSchema(fullGroupName);
+            Assert.assertNotNull(groupSchema);
+            Assert.assertEquals(newReplicaAlloc, 
groupSchema.getReplicaAlloc());
+
+            OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" + 
i);
+            Assert.assertEquals(newReplicaAlloc, 
tbl.getDefaultReplicaAllocation());
+            for (Partition partition : tbl.getAllPartitions()) {
+                Assert.assertEquals(newReplicaAlloc,
+                        
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()));
+            }
+
+            if (i == 2) {
+                Assert.assertEquals(newReplicaAlloc,
+                        
tbl.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation());
+            }
+        }
+
+        Config.enable_round_robin_create_tablet = false;
+
+        for (int k = 0; true; k++) {
+            Thread.sleep(1000); // sleep to wait dynamic partition scheduler 
run
+            boolean allStable = true;
+            for (int i = 0; i < 3; i++) {
+                String fullGroupName = GroupId.getFullGroupName(db.getId(), 
"mod_group_" + i);
+                ColocateGroupSchema groupSchema = 
env.getColocateTableIndex().getGroupSchema(fullGroupName);
+                Assert.assertNotNull(groupSchema);
+
+                if 
(env.getColocateTableIndex().isGroupUnstable(groupSchema.getGroupId())) {
+                    allStable = false;
+                    if (k >= 120) {
+                        Assert.assertTrue(fullGroupName + " is unstable", 
false);
+                    }
+                    continue;
+                }
+
+                Map<Long, Integer> backendReplicaNum = Maps.newHashMap();
+                OlapTable tbl = (OlapTable) 
db.getTableOrMetaException("col_tbl" + i);
+                int tabletNum = 0;
+                for (Partition partition : tbl.getAllPartitions()) {
+                    for (MaterializedIndex idx : 
partition.getMaterializedIndices(
+                                MaterializedIndex.IndexExtState.VISIBLE)) {
+                        for (Tablet tablet : idx.getTablets()) {
+                            Map<Tag, Short> allocMap = Maps.newHashMap();
+                            tabletNum++;
+                            for (Replica replica : tablet.getReplicas()) {
+                                long backendId = replica.getBackendId();
+                                Tag tag = backendTags.get(backendId);
+                                Assert.assertNotNull(tag);
+                                short oldNum = allocMap.getOrDefault(tag, 
(short) 0);
+                                allocMap.put(tag, (short) (oldNum + 1));
+                                backendReplicaNum.put(backendId, 
backendReplicaNum.getOrDefault(backendId, 0) + 1);
+                            }
+                            Assert.assertEquals(newReplicaAlloc.getAllocMap(), 
allocMap);
+                        }
+                    }
+                }
+
+                Assert.assertTrue(tabletNum > 0);
+
+                for (Map.Entry<Long, Integer> entry : 
backendReplicaNum.entrySet()) {
+                    long backendId = entry.getKey();
+                    int replicaNum = entry.getValue();
+                    Tag tag = backendTags.get(backendId);
+                    int sameTagReplicaNum = tabletNum * 
newReplicaAlloc.getAllocMap().getOrDefault(tag, (short) 0);
+                    int sameTagBeNum = (int) 
(backendTags.values().stream().filter(t -> t.equals(tag)).count());
+                    Assert.assertEquals("backend " + backendId + " failed: " + 
" all backend replica num: "
+                            + backendReplicaNum + ", all backend tag: " + 
backendTags,
+                            sameTagReplicaNum / sameTagBeNum, replicaNum);
+                }
+            }
+
+            if (allStable) {
+                break;
+            }
+        }
+    }
+
     @Test
     public void testShowMV() throws Exception {
         createMV("CREATE MATERIALIZED VIEW test_mv as select k1 from 
test.show_test group by k1;", false);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 2e2d53edb7a..701cd114cdb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -253,12 +253,16 @@ public class UtFrameUtils {
         FeConstants.runningUnitTest = true;
         FeConstants.enableInternalSchemaDb = false;
         int feRpcPort = startFEServer(runningDir);
+        List<Backend> bes = Lists.newArrayList();
         for (int i = 0; i < backendNum; i++) {
             String host = "127.0.0." + (i + 1);
             createBackend(host, feRpcPort);
         }
+        System.out.println("after create backend");
+        checkBEHeartbeat(bes);
         // sleep to wait first heartbeat
-        Thread.sleep(6000);
+        // Thread.sleep(6000);
+        System.out.println("after create backend2");
     }
 
     public static Backend createBackend(String beHost, int feRpcPort) throws 
IOException, InterruptedException {
@@ -293,6 +297,7 @@ public class UtFrameUtils {
         diskInfo1.setTotalCapacityB(1000000);
         diskInfo1.setAvailableCapacityB(500000);
         diskInfo1.setDataUsedCapacityB(480000);
+        diskInfo1.setPathHash(be.getId());
         disks.put(diskInfo1.getRootPath(), diskInfo1);
         be.setDisks(ImmutableMap.copyOf(disks));
         be.setAlive(true);
diff --git a/regression-test/suites/alter_p2/test_alter_colocate_group.groovy 
b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
new file mode 100644
index 00000000000..1f5b8496630
--- /dev/null
+++ b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("test_alter_colocate_group") {
+    sql "DROP DATABASE IF EXISTS test_alter_colocate_group_db FORCE"
+    test {
+        sql """
+              ALTER COLOCATE GROUP test_alter_colocate_group_db.bad_group_1
+              SET ( "replication_num" = "1" );
+            """
+
+        exception "unknown databases"
+    }
+    test {
+        sql """
+              ALTER COLOCATE GROUP bad_group_2
+              SET ( "replication_num" = "1" );
+        """
+
+        exception "Not found colocate group 
`default_cluster:regression_test_alter_p2`.`bad_group_2`"
+    }
+    test {
+        sql """
+              ALTER COLOCATE GROUP bad_db.__global__bad_group_3
+              SET ( "replication_num" = "1" );
+        """
+
+        exception "group that name starts with `__global__` is a global group, 
it doesn't belong to any specific database"
+    }
+    test {
+        sql """
+              ALTER COLOCATE GROUP __global__bad_group_4
+              SET ( "replication_num" = "1" );
+        """
+
+        exception "Not found colocate group `__global__bad_group_4`"
+    }
+
+    sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+    sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+    sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+
+    sql """
+        CREATE TABLE tbl1
+        (
+            k1 int,
+            k2 int
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 6
+        PROPERTIES
+        (
+            "colocate_with" = "group_1",
+            "replication_num" = "1"
+        );
+    """
+
+    sql """
+        CREATE TABLE tbl2
+        (
+            k1 date,
+            k2 int
+        )
+        PARTITION BY RANGE(k1)
+        (
+            PARTITION p1 values less than('2020-02-01'),
+            PARTITION p2 values less than('2020-03-01')
+        )
+        DISTRIBUTED BY HASH(k2) BUCKETS 5
+        PROPERTIES
+        (
+            "colocate_with" = "group_2",
+            "replication_num" = "1"
+        );
+    """
+
+    sql """
+        CREATE TABLE tbl3
+        (
+            `uuid` varchar(255) NULL,
+            `action_datetime` date NULL
+        )
+        DUPLICATE KEY(uuid)
+        PARTITION BY RANGE(action_datetime)()
+        DISTRIBUTED BY HASH(uuid) BUCKETS 4
+        PROPERTIES
+        (
+            "colocate_with" = "group_3",
+            "replication_num" = "1",
+            "dynamic_partition.enable" = "true",
+            "dynamic_partition.time_unit" = "DAY",
+            "dynamic_partition.end" = "2",
+            "dynamic_partition.prefix" = "p",
+            "dynamic_partition.buckets" = "4",
+            "dynamic_partition.replication_num" = "1"
+         );
+    """
+
+    def checkGroupsReplicaAlloc = { groupName, replicaNum ->
+        // groupName -> replicaAlloc
+        def allocMap = [:]
+        def groups = sql """ show proc "/colocation_group" """
+        for (def group : groups) {
+            allocMap[group[1]] = group[4]
+        }
+
+        assertEquals("tag.location.default: ${replicaNum}".toString(), 
allocMap[groupName])
+    }
+
+    def checkTableReplicaAlloc = { tableName, hasDynamicPart, replicaNum ->
+        def result = sql """ show create table ${tableName} """
+        def createTbl = result[0][1].toString()
+        assertTrue(createTbl.indexOf("\"replication_allocation\" = 
\"tag.location.default: ${replicaNum}\"") > 0)
+        if (hasDynamicPart) {
+            assertTrue(createTbl.indexOf(
+                    "\"dynamic_partition.replication_allocation\" = 
\"tag.location.default: ${replicaNum}\"") > 0)
+        }
+
+        result = sql """ show partitions from ${tableName} """
+        assertTrue(result.size() > 0)
+        for (int i = 0; i < result.size(); i++) {
+            assertEquals("${replicaNum}".toString(), result[i][9].toString())
+        }
+    }
+
+    for (int i = 1; i <= 3; i++) {
+        def groupName = "regression_test_alter_p2.group_${i}"
+        checkGroupsReplicaAlloc(groupName, 1)
+
+        def tableName = "tbl${i}"
+        def hasDynamicPart = i == 3
+        checkTableReplicaAlloc(tableName, hasDynamicPart, 1)
+
+        test {
+            sql """
+              ALTER COLOCATE GROUP ${groupName}
+              SET ( "replication_num" = "100" );
+            """
+
+            exception "Failed to find enough host"
+        }
+
+        test {
+            sql """
+              ALTER COLOCATE GROUP ${groupName}
+              SET ( "replication_num" = "3" );
+            """
+        }
+
+        checkGroupsReplicaAlloc(groupName, 3)
+        checkTableReplicaAlloc(tableName, hasDynamicPart, 3)
+    }
+
+    sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+    sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+    sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to