This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new abdeaec91dd [feature](alter colocate group) Support alter colocate group replica allocation #23320 (#25387) abdeaec91dd is described below commit abdeaec91ddc1250014198b7691a64bff4965ecb Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Fri Oct 13 16:22:43 2023 +0800 [feature](alter colocate group) Support alter colocate group replica allocation #23320 (#25387) --- .../Alter/ALTER-COLOCATE-GROUP.md | 84 +++++++++ docs/sidebars.json | 1 + .../Alter/ALTER-COLOCATE-GROUP.md | 88 ++++++++++ .../Alter/ALTER-WORKLOAD-GROUP.md | 4 +- fe/fe-core/src/main/cup/sql_parser.cup | 17 ++ .../doris/analysis/AlterColocateGroupStmt.java | 81 +++++++++ .../apache/doris/analysis/ColocateGroupName.java | 70 ++++++++ .../apache/doris/catalog/ColocateGroupSchema.java | 4 + .../apache/doris/catalog/ColocateTableIndex.java | 191 ++++++++++++++++++++- .../org/apache/doris/catalog/PartitionInfo.java | 4 + .../clone/ColocateTableCheckerAndBalancer.java | 133 +++++++++++++- .../org/apache/doris/clone/TabletScheduler.java | 18 +- .../doris/common/proc/TabletHealthProcDir.java | 5 + .../doris/httpv2/meta/ColocateMetaService.java | 2 +- .../org/apache/doris/journal/JournalEntity.java | 1 + .../org/apache/doris/master/ReportHandler.java | 5 + .../apache/doris/persist/ColocatePersistInfo.java | 27 ++- .../java/org/apache/doris/persist/EditLog.java | 9 + .../org/apache/doris/persist/OperationType.java | 4 + .../main/java/org/apache/doris/qe/DdlExecutor.java | 3 + fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + .../java/org/apache/doris/alter/AlterTest.java | 181 +++++++++++++++++-- .../org/apache/doris/utframe/UtFrameUtils.java | 7 +- .../alter_p2/test_alter_colocate_group.groovy | 170 ++++++++++++++++++ 24 files changed, 1076 insertions(+), 34 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md new file mode 100644 index 00000000000..54c87c05e67 --- /dev/null +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md @@ -0,0 +1,84 @@ +--- +{ +"title": "ALTER-COLOCATE-GROUP", +"language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## ALTER-COLOCATE-GROUP + +### Name + +ALTER COLOCATE GROUP + +<version since="dev"></version> + +### Description + +This statement is used to modify the colocation group. + +Syntax: + +```sql +ALTER COLOCATE GROUP [database.]group +SET ( + property_list +); +``` + +NOTE: + +1. If the colocate group is global, that is, its name starts with `__global__`, then it does not belong to any database; + +2. property_list is a colocation group attribute, currently only supports modifying `replication_num` and `replication_allocation`. After modifying these two attributes of the colocation group, at the same time, change the attribute `default.replication_allocation`, the attribute `dynamic.replication_allocation` of the table of the group, and the `replication_allocation` of the existing partition to be the same as it. + +### Example + +1. Modify the number of copies of a global group + + ```sql + # Set "colocate_with" = "__global__foo" when creating the table + + ALTER COLOCATE GROUP __global__foo + SET ( + "replication_num"="1" + ); + ``` + +2. Modify the number of copies of a non-global group + + ```sql + # Set "colocate_with" = "bar" when creating the table, and the Database is "example_db" + + ALTER COLOCATE GROUP example_db.bar + SET ( + "replication_num"="1" + ); + ``` + +### Keywords + +```sql +ALTER, COLOCATE, GROUP +``` + +### Best Practice diff --git a/docs/sidebars.json b/docs/sidebars.json index 744260a9a31..3b58f867dc7 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -795,6 +795,7 @@ "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION", "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN", "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-RESOURCE", + "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP", "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP", "sql-manual/sql-reference/Data-Definition-Statements/Alter/CANCEL-ALTER-TABLE", "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COMMENT", diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md new file mode 100644 index 00000000000..2b5ca2cc727 --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md @@ -0,0 +1,88 @@ +--- +{ +"title": "ALTER-COLOCATE-GROUP", +"language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## ALTER-COLOCATE-GROUP + +### Name + +ALTER COLOCATE GROUP + +<version since="dev"></version> + +### Description + +该语句用于修改 Colocation Group 的属性。 + +语法: + +```sql +ALTER COLOCATE GROUP [database.]group +SET ( + property_list +); +``` + +注意: + +1. 如果colocate group是全局的,即它的名称是以 `__global__` 开头的,那它不属于任何一个Database; + +2. property_list 是colocation group属性,目前只支持修改`replication_num` 和 `replication_allocation`。 + 修改colocation group的这两个属性修改之后,同时把该group的表的属性`default.replication_allocation` 、 + 属性`dynamic.replication_allocation `、以及已有分区的`replication_allocation`改成跟它一样。 + + + +### Example + +1. 修改一个全局group的副本数 + + ```sql + # 建表时设置 "colocate_with" = "__global__foo" + + ALTER COLOCATE GROUP __global__foo + SET ( + "replication_num"="1" + ); + ``` + +2. 修改一个非全局group的副本数 + + ```sql + # 建表时设置 "colocate_with" = "bar",且表属于Database example_db + + ALTER COLOCATE GROUP example_db.bar + SET ( + "replication_num"="1" + ); + ``` + +### Keywords + +```sql +ALTER, COLOCATE , GROUP +``` + +### Best Practice diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md index 1bc19780f6c..e3c7c17b660 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md @@ -1,6 +1,6 @@ --- { -"title": "ALTER-WORKLOAD -GROUP", +"title": "ALTER-WORKLOAD-GROUP", "language": "zh-CN" } --- @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -## ALTER-WORKLOAD -GROUP +## ALTER-WORKLOAD-GROUP ### Name diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 5950548c427..74896554b7e 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -296,6 +296,7 @@ terminal String KW_CLUSTERS, KW_COLLATE, KW_COLLATION, + KW_COLOCATE, KW_COLUMN, KW_COLUMNS, KW_COMMENT, @@ -798,6 +799,7 @@ nonterminal Qualifier opt_set_qualifier; nonterminal Operation set_op; nonterminal ArrayList<String> opt_common_hints; nonterminal String optional_on_ident; +nonterminal ColocateGroupName colocate_group_name; nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type; @@ -1327,6 +1329,10 @@ alter_stmt ::= {: RESULT = new AlterResourceStmt(resourceName, properties); :} + | KW_ALTER KW_COLOCATE KW_GROUP colocate_group_name:colocateGroupName KW_SET LPAREN key_value_map:properties RPAREN + {: + RESULT = new AlterColocateGroupStmt(colocateGroupName, properties); + :} | KW_ALTER KW_WORKLOAD KW_GROUP ident_or_text:workloadGroupName opt_properties:properties {: RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties); @@ -5521,6 +5527,17 @@ table_name ::= {: RESULT = new TableName(ctl, db, tbl); :} ; +colocate_group_name ::= + ident:group + {: + RESULT = new ColocateGroupName(null, group); + :} + | ident:db DOT ident:group + {: + RESULT = new ColocateGroupName(db, group); + :} + ; + encryptkey_name ::= ident:name {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java new file mode 100644 index 00000000000..e268322dcc8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +import java.util.Map; + +public class AlterColocateGroupStmt extends DdlStmt { + private final ColocateGroupName colocateGroupName; + private final Map<String, String> properties; + + public AlterColocateGroupStmt(ColocateGroupName colocateGroupName, Map<String, String> properties) { + this.colocateGroupName = colocateGroupName; + this.properties = properties; + } + + public ColocateGroupName getColocateGroupName() { + return colocateGroupName; + } + + public Map<String, String> getProperties() { + return properties; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + colocateGroupName.analyze(analyzer); + + String dbName = colocateGroupName.getDb(); + if (Strings.isNullOrEmpty(dbName)) { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv( + ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } else { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv( + ConnectContext.get(), dbName, PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, + ConnectContext.get().getQualifiedUser(), dbName); + } + } + + if (properties == null || properties.isEmpty()) { + throw new AnalysisException("Colocate group properties can't be null"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("ALTER COLOCATE GROUP ").append(colocateGroupName.toSql()).append(" "); + sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java new file mode 100644 index 00000000000..b7f0c0afd34 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; + +import com.google.common.base.Strings; + +public class ColocateGroupName { + private String db; + private String group; + + public ColocateGroupName(String db, String group) { + this.db = db; + this.group = group; + } + + public String getDb() { + return db; + } + + public String getGroup() { + return group; + } + + public void analyze(Analyzer analyzer) throws AnalysisException { + if (GroupId.isGlobalGroupName(group)) { + if (!Strings.isNullOrEmpty(db)) { + throw new AnalysisException("group that name starts with `" + GroupId.GLOBAL_COLOCATE_PREFIX + "`" + + " is a global group, it doesn't belong to any specific database"); + } + } else { + if (Strings.isNullOrEmpty(db)) { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + db = analyzer.getDefaultDb(); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), db); + } + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + if (!Strings.isNullOrEmpty(db)) { + sb.append("`").append(db).append("`."); + } + sb.append("`").append(group).append("`"); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java index b5004973c37..57d512b9789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java @@ -66,6 +66,10 @@ public class ColocateGroupSchema implements Writable { return replicaAlloc; } + public void setReplicaAlloc(ReplicaAllocation replicaAlloc) { + this.replicaAlloc = replicaAlloc; + } + public List<Type> getDistributionColTypes() { return distributionColTypes; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 23703278fd8..fcefcff132a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -17,10 +17,16 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.AlterColocateGroupStmt; +import org.apache.doris.clone.ColocateTableCheckerAndBalancer; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DynamicPartitionUtil; +import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.ColocatePersistInfo; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -249,10 +255,34 @@ public class ColocateTableIndex implements Writable { } } - public void addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, List<List<Long>> backendsPerBucketSeq) { + public void setBackendsPerBucketSeq(GroupId groupId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) { writeLock(); try { + Map<Tag, List<List<Long>>> backendsPerBucketSeqMap = group2BackendsPerBucketSeq.row(groupId); + if (backendsPerBucketSeqMap != null) { + backendsPerBucketSeqMap.clear(); + } + for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) { + group2BackendsPerBucketSeq.put(groupId, entry.getKey(), entry.getValue()); + } + } finally { + writeUnlock(); + } + } + + public boolean addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, List<List<Long>> backendsPerBucketSeq, + ReplicaAllocation originReplicaAlloc) { + writeLock(); + try { + ColocateGroupSchema groupSchema = group2Schema.get(groupId); + // replica allocation has outdate + if (groupSchema != null && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) { + LOG.info("replica allocation has outdate for group {}, old replica alloc {}, new replica alloc {}", + groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc()); + return false; + } group2BackendsPerBucketSeq.put(groupId, tag, backendsPerBucketSeq); + return true; } finally { writeUnlock(); } @@ -277,12 +307,20 @@ public class ColocateTableIndex implements Writable { } } - public void markGroupStable(GroupId groupId, boolean needEditLog) { + public void markGroupStable(GroupId groupId, boolean needEditLog, ReplicaAllocation originReplicaAlloc) { writeLock(); try { if (!group2Tables.containsKey(groupId)) { return; } + // replica allocation is outdate + ColocateGroupSchema groupSchema = group2Schema.get(groupId); + if (groupSchema != null && originReplicaAlloc != null + && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) { + LOG.warn("mark group {} failed, replica alloc has outdate, old replica alloc {}, new replica alloc {}", + groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc()); + return; + } if (unstableGroups.remove(groupId)) { group2ErrMsgs.put(groupId, ""); if (needEditLog) { @@ -604,13 +642,23 @@ public class ColocateTableIndex implements Writable { } public void replayMarkGroupStable(ColocatePersistInfo info) { - markGroupStable(info.getGroupId(), false); + markGroupStable(info.getGroupId(), false, null); } public void replayRemoveTable(ColocatePersistInfo info) { removeTable(info.getTableId()); } + public void replayModifyReplicaAlloc(ColocatePersistInfo info) throws UserException { + writeLock(); + try { + modifyColocateGroupReplicaAllocation(info.getGroupId(), info.getReplicaAlloc(), + info.getBackendsPerBucketSeq(), false); + } finally { + writeUnlock(); + } + } + // only for test public void clear() { writeLock(); @@ -633,7 +681,22 @@ public class ColocateTableIndex implements Writable { List<String> info = Lists.newArrayList(); GroupId groupId = entry.getValue(); info.add(groupId.toString()); - info.add(entry.getKey()); + String dbName = ""; + if (groupId.dbId != 0) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(groupId.dbId); + if (db != null) { + dbName = db.getFullName(); + int index = dbName.indexOf(":"); + if (index > 0) { + dbName = dbName.substring(index + 1); //use short db name + } + } + } + String groupName = entry.getKey(); + if (!GroupId.isGlobalGroupName(groupName)) { + groupName = dbName + "." + groupName.substring(groupName.indexOf("_") + 1); + } + info.add(groupName); info.add(Joiner.on(", ").join(group2Tables.get(groupId))); ColocateGroupSchema groupSchema = group2Schema.get(groupId); info.add(String.valueOf(groupSchema.getBucketsNum())); @@ -756,4 +819,124 @@ public class ColocateTableIndex implements Writable { public Map<Long, GroupId> getTable2Group() { return table2Group; } + + public void alterColocateGroup(AlterColocateGroupStmt stmt) throws UserException { + writeLock(); + try { + Map<String, String> properties = stmt.getProperties(); + String dbName = stmt.getColocateGroupName().getDb(); + String groupName = stmt.getColocateGroupName().getGroup(); + long dbId = 0; + if (!GroupId.isGlobalGroupName(groupName)) { + Database db = (Database) Env.getCurrentInternalCatalog().getDbOrMetaException(dbName); + dbId = db.getId(); + } + String fullGroupName = GroupId.getFullGroupName(dbId, groupName); + ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName); + if (groupSchema == null) { + throw new DdlException("Not found colocate group " + stmt.getColocateGroupName().toSql()); + } + + GroupId groupId = groupSchema.getGroupId(); + + if (properties.size() > 1) { + throw new DdlException("Can only set one colocate group property at a time"); + } + + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) { + ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, ""); + Preconditions.checkState(!replicaAlloc.isNotSet()); + Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc); + Map<Tag, List<List<Long>>> backendsPerBucketSeq = getBackendsPerBucketSeq(groupId); + Map<Tag, List<List<Long>>> newBackendsPerBucketSeq = Maps.newHashMap(); + for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) { + List<List<Long>> newList = Lists.newArrayList(); + for (List<Long> backends : entry.getValue()) { + newList.add(Lists.newArrayList(backends)); + } + newBackendsPerBucketSeq.put(entry.getKey(), newList); + } + try { + ColocateTableCheckerAndBalancer.modifyGroupReplicaAllocation(replicaAlloc, + newBackendsPerBucketSeq, groupSchema.getBucketsNum()); + } catch (Exception e) { + LOG.warn("modify group [{}, {}] to replication allocation {} failed, bucket seq {}", + fullGroupName, groupId, replicaAlloc, backendsPerBucketSeq, e); + throw new DdlException(e.getMessage()); + } + backendsPerBucketSeq = newBackendsPerBucketSeq; + Preconditions.checkState(backendsPerBucketSeq.size() == replicaAlloc.getAllocMap().size()); + modifyColocateGroupReplicaAllocation(groupSchema.getGroupId(), replicaAlloc, + backendsPerBucketSeq, true); + } else { + throw new DdlException("Unknown colocate group property: " + properties.keySet()); + } + } finally { + writeUnlock(); + } + } + + private void modifyColocateGroupReplicaAllocation(GroupId groupId, ReplicaAllocation replicaAlloc, + Map<Tag, List<List<Long>>> backendsPerBucketSeq, boolean needEditLog) throws UserException { + ColocateGroupSchema groupSchema = getGroupSchema(groupId); + if (groupSchema == null) { + LOG.warn("not found group {}", groupId); + return; + } + + List<Long> tableIds = getAllTableIds(groupId); + for (Long tableId : tableIds) { + long dbId = groupId.dbId; + if (dbId == 0) { + dbId = groupId.getDbIdByTblId(tableId); + } + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table == null || !isColocateTable(table.getId())) { + continue; + } + table.writeLock(); + try { + Map<String, String> tblProperties = Maps.newHashMap(); + tblProperties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, + replicaAlloc.toCreateStmt()); + table.setReplicaAllocation(tblProperties); + if (table.dynamicPartitionExists()) { + TableProperty tableProperty = table.getTableProperty(); + // Merge the new properties with origin properties, and then analyze them + Map<String, String> origDynamicProperties = tableProperty.getOriginDynamicPartitionProperty(); + origDynamicProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION, + replicaAlloc.toCreateStmt()); + Map<String, String> analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition( + origDynamicProperties, table, db); + tableProperty.modifyTableProperties(analyzedDynamicPartition); + tableProperty.buildDynamicProperty(); + } + for (ReplicaAllocation alloc : table.getPartitionInfo().getPartitionReplicaAllocations().values()) { + Map<Tag, Short> allocMap = alloc.getAllocMap(); + allocMap.clear(); + allocMap.putAll(replicaAlloc.getAllocMap()); + } + } finally { + table.writeUnlock(); + } + } + + if (!backendsPerBucketSeq.equals(group2BackendsPerBucketSeq.row(groupId))) { + markGroupUnstable(groupId, "change replica allocation", false); + } + groupSchema.setReplicaAlloc(replicaAlloc); + setBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + + if (needEditLog) { + ColocatePersistInfo info = ColocatePersistInfo.createForModifyReplicaAlloc(groupId, + replicaAlloc, backendsPerBucketSeq); + Env.getCurrentEnv().getEditLog().logColocateModifyRepliaAlloc(info); + } + LOG.info("modify group {} replication allocation to {}, is replay {}", groupId, replicaAlloc, !needEditLog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 6bd4604471a..235a8cc8db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -240,6 +240,10 @@ public class PartitionInfo implements Writable { idToStoragePolicy.put(partitionId, storagePolicy); } + public Map<Long, ReplicaAllocation> getPartitionReplicaAllocations() { + return idToReplicaAllocation; + } + public ReplicaAllocation getReplicaAllocation(long partitionId) { if (!idToReplicaAllocation.containsKey(partitionId)) { LOG.debug("failed to get replica allocation for partition: {}", partitionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 5c18c2bd468..4ec8993be0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -35,6 +35,7 @@ import org.apache.doris.clone.TabletScheduler.AddResult; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.ColocatePersistInfo; import org.apache.doris.resource.Tag; @@ -183,7 +184,12 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList(); if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) { - colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq); + if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq, + replicaAlloc)) { + LOG.warn("relocate group {} succ, but replica allocation has change, old replica alloc {}", + groupId, replicaAlloc); + continue; + } Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap = Maps.newHashMap(); balancedBackendsPerBucketSeqMap.put(tag, balancedBackendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo @@ -219,6 +225,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { continue; } + ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId); + ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc(); String unstableReason = null; OUT: for (Long tableId : tableIds) { @@ -237,8 +245,6 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { olapTable.readLock(); try { for (Partition partition : olapTable.getPartitions()) { - ReplicaAllocation replicaAlloc - = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()); short replicationNum = replicaAlloc.getTotalReplicaNum(); long visibleVersion = partition.getVisibleVersion(); // Here we only get VISIBLE indexes. All other indexes are not queryable. @@ -269,8 +275,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { TabletSchedCtx tabletCtx = new TabletSchedCtx( TabletSchedCtx.Type.REPAIR, db.getId(), tableId, partition.getId(), index.getId(), tablet.getId(), - olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()), - System.currentTimeMillis()); + replicaAlloc, System.currentTimeMillis()); // the tablet status will be set again when being scheduled tabletCtx.setTabletStatus(st); tabletCtx.setPriority(Priority.NORMAL); @@ -299,7 +304,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { // mark group as stable or unstable if (Strings.isNullOrEmpty(unstableReason)) { - colocateIndex.markGroupStable(groupId, true); + colocateIndex.markGroupStable(groupId, true, replicaAlloc); } else { colocateIndex.markGroupUnstable(groupId, unstableReason, true); } @@ -521,6 +526,122 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { return hostsPerBucketSeq; } + public static void modifyGroupReplicaAllocation(ReplicaAllocation replicaAlloc, + Map<Tag, List<List<Long>>> backendBucketsSeq, int bucketNum) throws Exception { + Map<Tag, Short> allocMap = replicaAlloc.getAllocMap(); + List<Tag> deleteTags = Lists.newArrayList(); + for (Tag tag : backendBucketsSeq.keySet()) { + if (!allocMap.containsKey(tag)) { + deleteTags.add(tag); + } + Preconditions.checkState(bucketNum == backendBucketsSeq.get(tag).size(), + bucketNum + " vs " + backendBucketsSeq.get(tag).size()); + } + deleteTags.forEach(tag -> backendBucketsSeq.remove(tag)); + + for (Tag tag : replicaAlloc.getAllocMap().keySet()) { + if (!backendBucketsSeq.containsKey(tag)) { + List<List<Long>> tagBackendBucketsSeq = Lists.newArrayList(); + for (int i = 0; i < bucketNum; i++) { + tagBackendBucketsSeq.add(Lists.newArrayList()); + } + backendBucketsSeq.put(tag, tagBackendBucketsSeq); + } + } + + Map<Long, Integer> backendToBucketNum = Maps.newHashMap(); + backendBucketsSeq.values().forEach(tagBackendIds -> + tagBackendIds.forEach(backendIds -> + backendIds.forEach(backendId -> backendToBucketNum.put( + backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1)))); + + for (Tag tag : backendBucketsSeq.keySet()) { + List<List<Long>> tagBackendBucketsSeq = backendBucketsSeq.get(tag); + int oldReplicaNum = tagBackendBucketsSeq.get(0).size(); + for (List<Long> backendIdsOneBucket : tagBackendBucketsSeq) { + Preconditions.checkState(backendIdsOneBucket.size() == oldReplicaNum, + backendIdsOneBucket.size() + " vs " + oldReplicaNum); + } + + int newReplicaNum = allocMap.get(tag); + if (newReplicaNum == oldReplicaNum) { + continue; + } + + List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(tag); + Set<Long> availableBeIds = backends.stream().filter(be -> be.isScheduleAvailable()) + .map(be -> be.getId()).collect(Collectors.toSet()); + + for (Long backendId : availableBeIds) { + if (!backendToBucketNum.containsKey(backendId)) { + backendToBucketNum.put(backendId, 0); + } + } + + for (int i = 0; i < tagBackendBucketsSeq.size(); i++) { + modifyGroupBucketReplicas(tag, newReplicaNum, tagBackendBucketsSeq.get(i), + availableBeIds, backendToBucketNum); + } + } + } + + private static void modifyGroupBucketReplicas(Tag tag, int newReplicaNum, List<Long> backendIds, + Set<Long> availableBeIds, Map<Long, Integer> backendToBucketNum) throws Exception { + final boolean smallIdFirst = Math.random() < 0.5; + if (backendIds.size() > newReplicaNum) { + backendIds.sort((id1, id2) -> { + boolean alive1 = availableBeIds.contains(id1); + boolean alive2 = availableBeIds.contains(id2); + if (alive1 != alive2) { + return alive1 ? -1 : 1; + } + int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0); + int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0); + if (bucketNum1 != bucketNum2) { + return Integer.compare(bucketNum1, bucketNum2); + } + + return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1); + }); + + for (int i = backendIds.size() - 1; i >= newReplicaNum; i--) { + long backendId = backendIds.get(i); + backendIds.remove(i); + backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) - 1); + } + } + + if (backendIds.size() < newReplicaNum) { + Set<Long> candBackendSet = Sets.newHashSet(); + candBackendSet.addAll(availableBeIds); + candBackendSet.removeAll(backendIds); + if (backendIds.size() + candBackendSet.size() < newReplicaNum) { + throw new UserException("Can not add backend for tag: " + tag); + } + + List<Long> candBackendList = Lists.newArrayList(candBackendSet); + candBackendList.sort((id1, id2) -> { + int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0); + int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0); + if (bucketNum1 != bucketNum2) { + return Integer.compare(bucketNum1, bucketNum2); + } + + return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1); + }); + + int addNum = newReplicaNum - backendIds.size(); + for (int i = 0; i < addNum; i++) { + long backendId = candBackendList.get(i); + backendIds.add(backendId); + backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1); + } + } + + Preconditions.checkState(newReplicaNum == backendIds.size(), + newReplicaNum + " vs " + backendIds.size()); + } + private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds, Set<Long> unavailBackendIds, LoadStatisticForTag statistic, List<Long> flatBackendsPerBucketSeq) { // backend id -> replica num, and sorted by replica num, descending. diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index d6a8e1efa0e..ee9da3ac100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; import org.apache.doris.analysis.AdminRebalanceDiskStmt; +import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Database; @@ -490,15 +491,20 @@ public class TabletScheduler extends MasterDaemon { throw new SchedException(Status.UNRECOVERABLE, "index does not exist"); } + ReplicaAllocation replicaAlloc = null; Tablet tablet = idx.getTablet(tabletId); Preconditions.checkNotNull(tablet); - ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); - if (isColocateTable) { GroupId groupId = colocateTableIndex.getGroup(tbl.getId()); if (groupId == null) { throw new SchedException(Status.UNRECOVERABLE, "colocate group does not exist"); } + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); + if (groupSchema == null) { + throw new SchedException(Status.UNRECOVERABLE, + "colocate group schema " + groupId + " does not exist"); + } + replicaAlloc = groupSchema.getReplicaAlloc(); int tabletOrderIdx = tabletCtx.getTabletOrderIdx(); if (tabletOrderIdx == -1) { @@ -512,6 +518,7 @@ public class TabletScheduler extends MasterDaemon { statusPair = Pair.of(st, Priority.HIGH); tabletCtx.setColocateGroupBackendIds(backendsSet); } else { + replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); List<Long> aliveBeIds = infoService.getAllBackendIds(true); statusPair = tablet.getHealthStatusWithPriority( infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); @@ -1484,14 +1491,18 @@ public class TabletScheduler extends MasterDaemon { return; } - replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId()); if (isColocateTable) { GroupId groupId = colocateTableIndex.getGroup(tbl.getId()); if (groupId == null) { return; } + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); + if (groupSchema == null) { + return; + } + replicaAlloc = groupSchema.getReplicaAlloc(); int tabletOrderIdx = tabletCtx.getTabletOrderIdx(); if (tabletOrderIdx == -1) { tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId()); @@ -1504,6 +1515,7 @@ public class TabletScheduler extends MasterDaemon { statusPair = Pair.of(st, Priority.HIGH); tabletCtx.setColocateGroupBackendIds(backendsSet); } else { + replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); List<Long> aliveBeIds = infoService.getAllBackendIds(true); statusPair = tablet.getHealthStatusWithPriority( infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index 93f54483cbf..3ce3ff74c7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -17,6 +17,7 @@ package org.apache.doris.common.proc; +import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -185,6 +186,10 @@ public class TabletHealthProcDir implements ProcDirInterface { ++tabletNum; Tablet.TabletStatus res = null; if (groupId != null) { + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); + if (groupSchema != null) { + replicaAlloc = groupSchema.getReplicaAlloc(); + } Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i); res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, backendsSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java index 9e51d38de5c..b7c2a615aac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java @@ -114,7 +114,7 @@ public class ColocateMetaService extends RestBaseController { if ("POST".equalsIgnoreCase(method)) { colocateIndex.markGroupUnstable(groupId, "mark unstable via http api", true); } else if ("DELETE".equalsIgnoreCase(method)) { - colocateIndex.markGroupStable(groupId, true); + colocateIndex.markGroupStable(groupId, true, null); } return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 0e0b8ac7df6..d3d0fe18d90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -456,6 +456,7 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC: case OperationType.OP_COLOCATE_ADD_TABLE: case OperationType.OP_COLOCATE_REMOVE_TABLE: case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ: diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index b78cbddb381..bfe1bb0a9e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -19,6 +19,7 @@ package org.apache.doris.master; import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -1172,6 +1173,10 @@ public class ReportHandler extends Daemon { int tabletOrderIdx = materializedIndex.getTabletOrderIdx(tabletId); Preconditions.checkState(tabletOrderIdx != -1, "get tablet materializedIndex for %s fail", tabletId); Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); + if (groupSchema != null) { + replicaAlloc = groupSchema.getReplicaAlloc(); + } TabletStatus status = tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet); if (status == TabletStatus.HEALTHY) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java index 459be646052..429d4e0e1a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java @@ -18,6 +18,7 @@ package org.apache.doris.persist; import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; @@ -45,29 +46,38 @@ public class ColocatePersistInfo implements Writable { private long tableId; @SerializedName(value = "backendsPerBucketSeq") private Map<Tag, List<List<Long>>> backendsPerBucketSeq = Maps.newHashMap(); + @SerializedName(value = "replicaAlloc") + private ReplicaAllocation replicaAlloc = new ReplicaAllocation(); - private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) { + private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, List<List<Long>>> backendsPerBucketSeq, + ReplicaAllocation replicaAlloc) { this.groupId = groupId; this.tableId = tableId; this.backendsPerBucketSeq = backendsPerBucketSeq; + this.replicaAlloc = replicaAlloc; } public static ColocatePersistInfo createForAddTable(GroupId groupId, long tableId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) { - return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq); + return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq, new ReplicaAllocation()); } public static ColocatePersistInfo createForBackendsPerBucketSeq(GroupId groupId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) { - return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq); + return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, new ReplicaAllocation()); } public static ColocatePersistInfo createForMarkUnstable(GroupId groupId) { - return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap()); + return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new ReplicaAllocation()); } public static ColocatePersistInfo createForMarkStable(GroupId groupId) { - return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap()); + return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new ReplicaAllocation()); + } + + public static ColocatePersistInfo createForModifyReplicaAlloc(GroupId groupId, ReplicaAllocation replicaAlloc, + Map<Tag, List<List<Long>>> backendsPerBucketSeq) { + return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, replicaAlloc); } public static ColocatePersistInfo read(DataInput in) throws IOException { @@ -87,6 +97,10 @@ public class ColocatePersistInfo implements Writable { return backendsPerBucketSeq; } + public ReplicaAllocation getReplicaAlloc() { + return replicaAlloc; + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); @@ -129,7 +143,7 @@ public class ColocatePersistInfo implements Writable { ColocatePersistInfo info = (ColocatePersistInfo) obj; return tableId == info.tableId && groupId.equals(info.groupId) && backendsPerBucketSeq.equals( - info.backendsPerBucketSeq); + info.backendsPerBucketSeq) && replicaAlloc.equals(info.replicaAlloc); } @Override @@ -138,6 +152,7 @@ public class ColocatePersistInfo implements Writable { sb.append("table id: ").append(tableId); sb.append(" group id: ").append(groupId); sb.append(" backendsPerBucketSeq: ").append(backendsPerBucketSeq); + sb.append(" replicaAlloc: ").append(replicaAlloc); return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 97dd6719e9b..b0540abf9ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -594,6 +594,11 @@ public class EditLog { env.getColocateTableIndex().replayMarkGroupStable(info); break; } + case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC: { + final ColocatePersistInfo info = (ColocatePersistInfo) journal.getData(); + env.getColocateTableIndex().replayModifyReplicaAlloc(info); + break; + } case OperationType.OP_MODIFY_TABLE_COLOCATE: { final TablePropertyInfo info = (TablePropertyInfo) journal.getData(); env.replayModifyTableColocate(info); @@ -1497,6 +1502,10 @@ public class EditLog { Env.getCurrentEnv().getBinlogManager().addTruncateTable(info, logId); } + public void logColocateModifyRepliaAlloc(ColocatePersistInfo info) { + logEdit(OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC, info); + } + public void logColocateAddTable(ColocatePersistInfo info) { logEdit(OperationType.OP_COLOCATE_ADD_TABLE, info); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 3cf30df2428..b2f3ffb7e33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -186,6 +186,10 @@ public class OperationType { public static final short OP_ADD_GLOBAL_FUNCTION = 132; public static final short OP_DROP_GLOBAL_FUNCTION = 133; + // modify database/table/tablet/replica meta + public static final short OP_SET_REPLICA_VERSION = 141; + public static final short OP_COLOCATE_MOD_REPLICA_ALLOC = 142; + // routine load 200 public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200; public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 29e29a9a7dd..f50d8d1ae02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.AdminSetReplicaStatusStmt; import org.apache.doris.analysis.AlterCatalogCommentStmt; import org.apache.doris.analysis.AlterCatalogNameStmt; import org.apache.doris.analysis.AlterCatalogPropertyStmt; +import org.apache.doris.analysis.AlterColocateGroupStmt; import org.apache.doris.analysis.AlterColumnStatsStmt; import org.apache.doris.analysis.AlterDatabasePropertyStmt; import org.apache.doris.analysis.AlterDatabaseQuotaStmt; @@ -298,6 +299,8 @@ public class DdlExecutor { env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt); } else if (ddlStmt instanceof AlterResourceStmt) { env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt); + } else if (ddlStmt instanceof AlterColocateGroupStmt) { + env.getColocateTableIndex().alterColocateGroup((AlterColocateGroupStmt) ddlStmt); } else if (ddlStmt instanceof AlterWorkloadGroupStmt) { env.getWorkloadGroupMgr().alterWorkloadGroup((AlterWorkloadGroupStmt) ddlStmt); } else if (ddlStmt instanceof CreatePolicyStmt) { diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 4f4f3392dc2..c4791093b48 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -146,6 +146,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS)); keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE)); keywordMap.put("collation", new Integer(SqlParserSymbols.KW_COLLATION)); + keywordMap.put("colocate", new Integer(SqlParserSymbols.KW_COLOCATE)); keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN)); keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS)); keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index 924fcd53b7d..4a78ef4de28 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -17,6 +17,7 @@ package org.apache.doris.alter; +import org.apache.doris.analysis.AlterColocateGroupStmt; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; @@ -26,6 +27,8 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DateLiteral; import org.apache.doris.analysis.DropResourceStmt; import org.apache.doris.analysis.ShowCreateMaterializedViewStmt; +import org.apache.doris.catalog.ColocateGroupSchema; +import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; @@ -36,10 +39,13 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.Type; +import org.apache.doris.clone.RebalancerTestUtil; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -47,6 +53,7 @@ import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.ShowExecutor; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -70,18 +77,36 @@ public class AlterTest { private static String runningDir = "fe/mocked/AlterTest/" + UUID.randomUUID().toString() + "/"; private static ConnectContext connectContext; - private static Backend be; + + private static Map<Long, Tag> backendTags; @BeforeClass public static void beforeClass() throws Exception { FeConstants.runningUnitTest = true; FeConstants.default_scheduler_interval_millisecond = 100; + FeConstants.tablet_checker_interval_ms = 100; + FeConstants.tablet_checker_interval_ms = 100; Config.dynamic_partition_check_interval_seconds = 1; Config.disable_storage_medium_check = true; Config.enable_storage_policy = true; - UtFrameUtils.createDorisCluster(runningDir); + Config.disable_balance = true; + Config.schedule_batch_size = 400; + Config.schedule_slot_num_per_hdd_path = 100; + UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5); + + List<Backend> backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + + Map<String, String> tagMap = Maps.newHashMap(); + tagMap.put(Tag.TYPE_LOCATION, "group_a"); + backends.get(2).setTagMap(tagMap); + backends.get(3).setTagMap(tagMap); + + tagMap = Maps.newHashMap(); + tagMap.put(Tag.TYPE_LOCATION, "group_b"); + backends.get(4).setTagMap(tagMap); - be = Env.getCurrentSystemInfo().getIdToBackend().values().asList().get(0); + backendTags = Maps.newHashMap(); + backends.forEach(be -> backendTags.put(be.getId(), be.getLocationTag())); // create connect context connectContext = UtFrameUtils.createDefaultCtx(); @@ -443,21 +468,16 @@ public class AlterTest { // set un-partitioned table's real replication num // first we need to change be's tag - Map<String, String> originTagMap = be.getTagMap(); - Map<String, String> tagMap = Maps.newHashMap(); - tagMap.put(Tag.TYPE_LOCATION, "group1"); - be.setTagMap(tagMap); OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("tbl2"); Partition partition = tbl2.getPartition(tbl2.getName()); Assert.assertEquals(Short.valueOf("1"), Short.valueOf(tbl2.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum())); - stmt = "alter table test.tbl2 set ('replication_allocation' = 'tag.location.group1:1');"; + stmt = "alter table test.tbl2 set ('replication_allocation' = 'tag.location.group_a:1');"; alterTable(stmt, false); Assert.assertEquals((short) 1, (short) tbl2.getPartitionInfo().getReplicaAllocation(partition.getId()) - .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group1"))); + .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group_a"))); Assert.assertEquals((short) 1, (short) tbl2.getTableProperty().getReplicaAllocation() - .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group1"))); - be.setTagMap(originTagMap); + .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group_a"))); Thread.sleep(5000); // sleep to wait dynamic partition scheduler run // add partition without set replication num, and default num is 3. @@ -1239,6 +1259,145 @@ public class AlterTest { Env.getCurrentEnv().getResourceMgr().dropResource(stmt); } + @Test + public void testModifyColocateGroupReplicaAlloc() throws Exception { + Config.enable_round_robin_create_tablet = true; + + createTable("CREATE TABLE test.col_tbl0\n" + "(\n" + " k1 date,\n" + " k2 int,\n" + " v1 int \n" + + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n" + + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n" + + "PROPERTIES('replication_num' = '2', 'colocate_with' = 'mod_group_0');"); + + createTable("CREATE TABLE test.col_tbl1\n" + "(\n" + " k1 date,\n" + " k2 int,\n" + " v1 int \n" + + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n" + "PARTITION BY RANGE(k1)\n" + "(\n" + + " PARTITION p1 values less than('2020-02-01'),\n" + + " PARTITION p2 values less than('2020-03-01')\n" + ")\n" + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n" + + "PROPERTIES('replication_num' = '2', 'colocate_with' = 'mod_group_1');"); + + createTable("CREATE TABLE test.col_tbl2 (\n" + + "`uuid` varchar(255) NULL,\n" + + "`action_datetime` date NULL\n" + + ")\n" + + "DUPLICATE KEY(uuid)\n" + + "PARTITION BY RANGE(action_datetime)()\n" + + "DISTRIBUTED BY HASH(uuid) BUCKETS 4\n" + + "PROPERTIES\n" + + "(\n" + + "\"replication_num\" = \"2\",\n" + + "\"colocate_with\" = \"mod_group_2\",\n" + + "\"dynamic_partition.enable\" = \"true\",\n" + + "\"dynamic_partition.time_unit\" = \"DAY\",\n" + + "\"dynamic_partition.end\" = \"2\",\n" + + "\"dynamic_partition.prefix\" = \"p\",\n" + + "\"dynamic_partition.buckets\" = \"4\",\n" + + "\"dynamic_partition.replication_num\" = \"2\"\n" + + ");\n"); + + Env env = Env.getCurrentEnv(); + Database db = env.getInternalCatalog().getDbOrMetaException("default_cluster:test"); + OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("col_tbl2"); + for (int j = 0; true; j++) { + Thread.sleep(2000); + if (tbl2.getAllPartitions().size() > 0) { + break; + } + if (j >= 5) { + Assert.assertTrue("dynamic table not create partition", false); + } + } + + RebalancerTestUtil.updateReplicaPathHash(); + + ReplicaAllocation newReplicaAlloc = new ReplicaAllocation(); + newReplicaAlloc.put(Tag.DEFAULT_BACKEND_TAG, (short) 1); + newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_a"), (short) 1); + newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_b"), (short) 1); + + for (int i = 0; i < 3; i++) { + String groupName = "mod_group_" + i; + String sql = "alter colocate group test." + groupName + + " set ( 'replication_allocation' = '" + newReplicaAlloc.toCreateStmt() + "')"; + String fullGroupName = GroupId.getFullGroupName(db.getId(), groupName); + AlterColocateGroupStmt stmt = (AlterColocateGroupStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + DdlExecutor.execute(env, stmt); + + ColocateGroupSchema groupSchema = env.getColocateTableIndex().getGroupSchema(fullGroupName); + Assert.assertNotNull(groupSchema); + Assert.assertEquals(newReplicaAlloc, groupSchema.getReplicaAlloc()); + + OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" + i); + Assert.assertEquals(newReplicaAlloc, tbl.getDefaultReplicaAllocation()); + for (Partition partition : tbl.getAllPartitions()) { + Assert.assertEquals(newReplicaAlloc, + tbl.getPartitionInfo().getReplicaAllocation(partition.getId())); + } + + if (i == 2) { + Assert.assertEquals(newReplicaAlloc, + tbl.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation()); + } + } + + Config.enable_round_robin_create_tablet = false; + + for (int k = 0; true; k++) { + Thread.sleep(1000); // sleep to wait dynamic partition scheduler run + boolean allStable = true; + for (int i = 0; i < 3; i++) { + String fullGroupName = GroupId.getFullGroupName(db.getId(), "mod_group_" + i); + ColocateGroupSchema groupSchema = env.getColocateTableIndex().getGroupSchema(fullGroupName); + Assert.assertNotNull(groupSchema); + + if (env.getColocateTableIndex().isGroupUnstable(groupSchema.getGroupId())) { + allStable = false; + if (k >= 120) { + Assert.assertTrue(fullGroupName + " is unstable", false); + } + continue; + } + + Map<Long, Integer> backendReplicaNum = Maps.newHashMap(); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" + i); + int tabletNum = 0; + for (Partition partition : tbl.getAllPartitions()) { + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + for (Tablet tablet : idx.getTablets()) { + Map<Tag, Short> allocMap = Maps.newHashMap(); + tabletNum++; + for (Replica replica : tablet.getReplicas()) { + long backendId = replica.getBackendId(); + Tag tag = backendTags.get(backendId); + Assert.assertNotNull(tag); + short oldNum = allocMap.getOrDefault(tag, (short) 0); + allocMap.put(tag, (short) (oldNum + 1)); + backendReplicaNum.put(backendId, backendReplicaNum.getOrDefault(backendId, 0) + 1); + } + Assert.assertEquals(newReplicaAlloc.getAllocMap(), allocMap); + } + } + } + + Assert.assertTrue(tabletNum > 0); + + for (Map.Entry<Long, Integer> entry : backendReplicaNum.entrySet()) { + long backendId = entry.getKey(); + int replicaNum = entry.getValue(); + Tag tag = backendTags.get(backendId); + int sameTagReplicaNum = tabletNum * newReplicaAlloc.getAllocMap().getOrDefault(tag, (short) 0); + int sameTagBeNum = (int) (backendTags.values().stream().filter(t -> t.equals(tag)).count()); + Assert.assertEquals("backend " + backendId + " failed: " + " all backend replica num: " + + backendReplicaNum + ", all backend tag: " + backendTags, + sameTagReplicaNum / sameTagBeNum, replicaNum); + } + } + + if (allStable) { + break; + } + } + } + @Test public void testShowMV() throws Exception { createMV("CREATE MATERIALIZED VIEW test_mv as select k1 from test.show_test group by k1;", false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 2e2d53edb7a..701cd114cdb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -253,12 +253,16 @@ public class UtFrameUtils { FeConstants.runningUnitTest = true; FeConstants.enableInternalSchemaDb = false; int feRpcPort = startFEServer(runningDir); + List<Backend> bes = Lists.newArrayList(); for (int i = 0; i < backendNum; i++) { String host = "127.0.0." + (i + 1); createBackend(host, feRpcPort); } + System.out.println("after create backend"); + checkBEHeartbeat(bes); // sleep to wait first heartbeat - Thread.sleep(6000); + // Thread.sleep(6000); + System.out.println("after create backend2"); } public static Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { @@ -293,6 +297,7 @@ public class UtFrameUtils { diskInfo1.setTotalCapacityB(1000000); diskInfo1.setAvailableCapacityB(500000); diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setPathHash(be.getId()); disks.put(diskInfo1.getRootPath(), diskInfo1); be.setDisks(ImmutableMap.copyOf(disks)); be.setAlive(true); diff --git a/regression-test/suites/alter_p2/test_alter_colocate_group.groovy b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy new file mode 100644 index 00000000000..1f5b8496630 --- /dev/null +++ b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("test_alter_colocate_group") { + sql "DROP DATABASE IF EXISTS test_alter_colocate_group_db FORCE" + test { + sql """ + ALTER COLOCATE GROUP test_alter_colocate_group_db.bad_group_1 + SET ( "replication_num" = "1" ); + """ + + exception "unknown databases" + } + test { + sql """ + ALTER COLOCATE GROUP bad_group_2 + SET ( "replication_num" = "1" ); + """ + + exception "Not found colocate group `default_cluster:regression_test_alter_p2`.`bad_group_2`" + } + test { + sql """ + ALTER COLOCATE GROUP bad_db.__global__bad_group_3 + SET ( "replication_num" = "1" ); + """ + + exception "group that name starts with `__global__` is a global group, it doesn't belong to any specific database" + } + test { + sql """ + ALTER COLOCATE GROUP __global__bad_group_4 + SET ( "replication_num" = "1" ); + """ + + exception "Not found colocate group `__global__bad_group_4`" + } + + sql " DROP TABLE IF EXISTS tbl1 FORCE; " + sql " DROP TABLE IF EXISTS tbl2 FORCE; " + sql " DROP TABLE IF EXISTS tbl3 FORCE; " + + sql """ + CREATE TABLE tbl1 + ( + k1 int, + k2 int + ) + DISTRIBUTED BY HASH(k1) BUCKETS 6 + PROPERTIES + ( + "colocate_with" = "group_1", + "replication_num" = "1" + ); + """ + + sql """ + CREATE TABLE tbl2 + ( + k1 date, + k2 int + ) + PARTITION BY RANGE(k1) + ( + PARTITION p1 values less than('2020-02-01'), + PARTITION p2 values less than('2020-03-01') + ) + DISTRIBUTED BY HASH(k2) BUCKETS 5 + PROPERTIES + ( + "colocate_with" = "group_2", + "replication_num" = "1" + ); + """ + + sql """ + CREATE TABLE tbl3 + ( + `uuid` varchar(255) NULL, + `action_datetime` date NULL + ) + DUPLICATE KEY(uuid) + PARTITION BY RANGE(action_datetime)() + DISTRIBUTED BY HASH(uuid) BUCKETS 4 + PROPERTIES + ( + "colocate_with" = "group_3", + "replication_num" = "1", + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.end" = "2", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "4", + "dynamic_partition.replication_num" = "1" + ); + """ + + def checkGroupsReplicaAlloc = { groupName, replicaNum -> + // groupName -> replicaAlloc + def allocMap = [:] + def groups = sql """ show proc "/colocation_group" """ + for (def group : groups) { + allocMap[group[1]] = group[4] + } + + assertEquals("tag.location.default: ${replicaNum}".toString(), allocMap[groupName]) + } + + def checkTableReplicaAlloc = { tableName, hasDynamicPart, replicaNum -> + def result = sql """ show create table ${tableName} """ + def createTbl = result[0][1].toString() + assertTrue(createTbl.indexOf("\"replication_allocation\" = \"tag.location.default: ${replicaNum}\"") > 0) + if (hasDynamicPart) { + assertTrue(createTbl.indexOf( + "\"dynamic_partition.replication_allocation\" = \"tag.location.default: ${replicaNum}\"") > 0) + } + + result = sql """ show partitions from ${tableName} """ + assertTrue(result.size() > 0) + for (int i = 0; i < result.size(); i++) { + assertEquals("${replicaNum}".toString(), result[i][9].toString()) + } + } + + for (int i = 1; i <= 3; i++) { + def groupName = "regression_test_alter_p2.group_${i}" + checkGroupsReplicaAlloc(groupName, 1) + + def tableName = "tbl${i}" + def hasDynamicPart = i == 3 + checkTableReplicaAlloc(tableName, hasDynamicPart, 1) + + test { + sql """ + ALTER COLOCATE GROUP ${groupName} + SET ( "replication_num" = "100" ); + """ + + exception "Failed to find enough host" + } + + test { + sql """ + ALTER COLOCATE GROUP ${groupName} + SET ( "replication_num" = "3" ); + """ + } + + checkGroupsReplicaAlloc(groupName, 3) + checkTableReplicaAlloc(tableName, hasDynamicPart, 3) + } + + sql " DROP TABLE IF EXISTS tbl1 FORCE; " + sql " DROP TABLE IF EXISTS tbl2 FORCE; " + sql " DROP TABLE IF EXISTS tbl3 FORCE; " +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org