This is an automated email from the ASF dual-hosted git repository. xuyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3c28a71378 [fix](dynamic partition) partition create failed after alter distributed column (#20239) 3c28a71378 is described below commit 3c28a7137830e8bebec826ff7d08215665b8835d Author: camby <zhuxiaol...@baidu.com> AuthorDate: Mon Jun 5 12:20:50 2023 +0800 [fix](dynamic partition) partition create failed after alter distributed column (#20239) This pr fix following two problems: Problem1: Alter column comment make add dynamic partition failed inside issue #10811 create table with dynamic partition policy; restart FE; alter distribution column comment; alter dynamic_partition.end to trigger add new partition by dynamic partition scheduler; Then we got the error log, and the new partition create failed. dynamic add partition failed: errCode = 2, detailMessage = Cannot assign hash distribution with different distribution cols. default is: [id int(11) NULL COMMENT 'new_comment_of_id'], db: default_cluster:example_db, table: test_2 Problem2: rename distributed column, make old partition insert failed. inside #20405 The key point of the reproduce steps is restart FE. It seems all versions will be affected, include master and lts-1.1 and so on. --- .../doris/analysis/HashDistributionDesc.java | 4 +- .../main/java/org/apache/doris/catalog/Column.java | 22 ++++++++ .../main/java/org/apache/doris/catalog/Env.java | 25 ++++++--- .../apache/doris/catalog/HashDistributionInfo.java | 14 ++++- .../apache/doris/datasource/InternalCatalog.java | 13 ++--- regression-test/pipeline/p0/conf/fe.conf | 2 + regression-test/pipeline/p1/conf/fe.conf | 2 + .../test_dynamic_partition_with_alter.groovy | 56 ++++++++++++++++++++ .../test_dynamic_partition_with_rename.groovy | 60 ++++++++++++++++++++++ 9 files changed, 181 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java index d54049f793..374bf10bdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java @@ -131,7 +131,9 @@ public class HashDistributionDesc extends DistributionDesc { + column.getName() + "]."); } - distributionColumns.add(column); + // distribution info and base columns persist seperately inside OlapTable, so we need deep copy + // to avoid modify table columns also modify columns inside distribution info. + distributionColumns.add(new Column(column)); find = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 615851e257..936db90a35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -784,6 +784,28 @@ public class Column implements Writable, GsonPostProcessable { && Objects.equals(realDefaultValue, other.realDefaultValue); } + // distribution column compare only care about attrs which affect data, + // do not care about attrs, such as comment + public boolean equalsForDistribution(Column other) { + if (other == this) { + return true; + } + + return name.equalsIgnoreCase(other.name) + && Objects.equals(getDefaultValue(), other.getDefaultValue()) + && Objects.equals(aggregationType, other.aggregationType) + && isAggregationTypeImplicit == other.isAggregationTypeImplicit + && isKey == other.isKey + && isAllowNull == other.isAllowNull + && getDataType().equals(other.getDataType()) + && getStrLen() == other.getStrLen() + && getPrecision() == other.getPrecision() + && getScale() == other.getScale() + && visible == other.visible + && Objects.equals(children, other.children) + && Objects.equals(realDefaultValue, other.realDefaultValue); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index bc3a3b4f76..c64b4a2c74 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -4290,12 +4290,26 @@ public class Env { // 4. modify distribution info DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); if (distributionInfo.getType() == DistributionInfoType.HASH) { + // modify default distribution info List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); for (Column column : distributionColumns) { if (column.getName().equalsIgnoreCase(colName)) { column.setName(newColName); } } + // modify distribution info inside partitions + for (Partition p : table.getPartitions()) { + DistributionInfo partDistInfo = p.getDistributionInfo(); + if (partDistInfo.getType() != DistributionInfoType.HASH) { + continue; + } + List<Column> partDistColumns = ((HashDistributionInfo) partDistInfo).getDistributionColumns(); + for (Column column : partDistColumns) { + if (column.getName().equalsIgnoreCase(colName)) { + column.setName(newColName); + } + } + } } // 5. modify sequence map col @@ -4546,13 +4560,10 @@ public class Env { } if (distributionInfo.getType() == DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; - List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns(); - List<Column> defaultDistriCols - = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns(); - if (!newDistriCols.equals(defaultDistriCols)) { - throw new DdlException( - "Cannot assign hash distribution with different distribution cols. " + "default is: " - + defaultDistriCols); + if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) { + throw new DdlException("Cannot assign hash distribution with different distribution cols. " + + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: " + + ((HashDistributionInfo) distributionInfo).getDistributionColumns()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index d746bd355f..5f30bc2098 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -95,6 +95,18 @@ public class HashDistributionInfo extends DistributionInfo { return distributionInfo; } + public boolean sameDistributionColumns(HashDistributionInfo other) { + if (distributionColumns.size() != other.distributionColumns.size()) { + return false; + } + for (int i = 0; i < distributionColumns.size(); ++i) { + if (!distributionColumns.get(i).equalsForDistribution(other.distributionColumns.get(i))) { + return false; + } + } + return true; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -107,7 +119,7 @@ public class HashDistributionInfo extends DistributionInfo { return false; } HashDistributionInfo that = (HashDistributionInfo) o; - return bucketNum == that.bucketNum && Objects.equals(distributionColumns, that.distributionColumns); + return bucketNum == that.bucketNum && sameDistributionColumns(that); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index d1c0354337..1fc3b20bcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1410,17 +1410,14 @@ public class InternalCatalog implements CatalogIf<Database> { if (distributionInfo.getType() == DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; - List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns(); - List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo) - .getDistributionColumns(); - if (!newDistriCols.equals(defaultDistriCols)) { - throw new DdlException( - "Cannot assign hash distribution with different distribution cols. " + "default is: " - + defaultDistriCols); - } if (hashDistributionInfo.getBucketNum() <= 0) { throw new DdlException("Cannot assign hash distribution buckets less than 1"); } + if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) { + throw new DdlException("Cannot assign hash distribution with different distribution cols. " + + "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: " + + ((HashDistributionInfo) distributionInfo).getDistributionColumns()); + } } else if (distributionInfo.getType() == DistributionInfoType.RANDOM) { RandomDistributionInfo randomDistributionInfo = (RandomDistributionInfo) distributionInfo; if (randomDistributionInfo.getBucketNum() <= 0) { diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 5879c463ee..d2e8cbd282 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -80,3 +80,5 @@ enable_struct_type=true # enable mtmv enable_mtmv = true + +dynamic_partition_check_interval_seconds=5 diff --git a/regression-test/pipeline/p1/conf/fe.conf b/regression-test/pipeline/p1/conf/fe.conf index b7fbf07bcb..ba7de606b5 100644 --- a/regression-test/pipeline/p1/conf/fe.conf +++ b/regression-test/pipeline/p1/conf/fe.conf @@ -82,3 +82,5 @@ enable_mtmv = true # enable auto collect statistics enable_auto_collect_statistics=true auto_check_statistics_in_sec=60 + +dynamic_partition_check_interval_seconds=5 diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy new file mode 100644 index 0000000000..32cfc742a7 --- /dev/null +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_alter.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_dynamic_partition_with_alter") { + def tbl = "test_dynamic_partition_with_alter" + sql "drop table if exists ${tbl}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbl} + ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL ) + AGGREGATE KEY(k1,k2) + PARTITION BY RANGE(k1) ( ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="1", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1") + """ + result = sql "show partitions from ${tbl}" + assertEquals(7, result.size()) + + // modify distributed column comment, then try to add too more dynamic partition + sql """ alter table ${tbl} modify column k1 comment 'new_comment_for_k1' """ + sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """ + sql """ alter table ${tbl} set('dynamic_partition.end'='5') """ + result = sql "show partitions from ${tbl}" + for (def retry = 0; retry < 15; retry++) { + if (result.size() == 9) { + break; + } + logger.info("wait dynamic partition scheduler, sleep 1s") + sleep(1000); + result = sql "show partitions from ${tbl}" + } + assertEquals(9, result.size()) + + sql "drop table ${tbl}" +} diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy new file mode 100644 index 0000000000..b07a2f1a63 --- /dev/null +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_with_rename.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_dynamic_partition_with_rename") { + def tbl = "test_dynamic_partition_with_rename" + sql "drop table if exists ${tbl}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbl} + ( k1 date NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL ) + AGGREGATE KEY(k1,k2) + PARTITION BY RANGE(k1) ( ) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="1", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1") + """ + result = sql "show partitions from ${tbl}" + assertEquals(7, result.size()) + + // rename distributed column, then try to add too more dynamic partition + sql "alter table ${tbl} rename column k1 renamed_k1" + sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """ + sql """ alter table ${tbl} set('dynamic_partition.end'='5') """ + result = sql "show partitions from ${tbl}" + for (def retry = 0; retry < 15; retry++) { + if (result.size() == 9) { + break; + } + logger.info("wait dynamic partition scheduler, sleep 1s") + sleep(1000); + result = sql "show partitions from ${tbl}" + } + assertEquals(9, result.size()) + for (def line = 0; line < result.size(); line++) { + // XXX: DistributionKey at pos(7), next maybe impl by sql meta + assertEquals("renamed_k1", result.get(line).get(7)) + } + + sql "drop table ${tbl}" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org