This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d51a8d2131131cb4ff93157fa3b58efd87f17cc9 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Sun Jun 26 09:52:16 2022 +0800 [fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414) --- .../apache/doris/alter/SchemaChangeHandler.java | 16 ++++- .../org/apache/doris/analysis/CreateTableStmt.java | 27 ++++---- .../java/org/apache/doris/catalog/Catalog.java | 15 ++++ .../org/apache/doris/catalog/DistributionInfo.java | 33 --------- .../apache/doris/alter/SchemaChangeJobV2Test.java | 79 ++++++++++++++++++++-- .../org/apache/doris/catalog/CatalogTestUtil.java | 14 ---- .../org/apache/doris/catalog/CreateTableTest.java | 30 ++++++++ 7 files changed, 147 insertions(+), 67 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index d12382f926..69e6fbf768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -48,6 +48,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.ReplicaAllocation; @@ -768,7 +769,15 @@ public class SchemaChangeHandler extends AlterHandler { newColumn.setIsKey(true); } else if (newColumn.getAggregationType() == AggregateType.SUM && newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) { - throw new DdlException("The default value of '" + newColName + "' with SUM aggregation function must be zero"); + throw new DdlException("The default value of '" + + newColName + "' with SUM aggregation function must be zero"); + } else if (olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) { + if (newColumn.getAggregationType() == AggregateType.REPLACE + || newColumn.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) { + throw new DdlException("Can not add value column with aggregation type " + + newColumn.getAggregationType() + " for olap table with random distribution : " + + newColName); + } } } else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) { if (newColumn.getAggregationType() != null) { @@ -1481,6 +1490,11 @@ public class SchemaChangeHandler extends AlterHandler { Catalog.getCurrentCatalog().modifyTableColocate(db, olapTable, colocateGroup, false, null); return; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) { + String distributionType = properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE); + if (!distributionType.equalsIgnoreCase("random")) { + throw new DdlException("Only support modifying distribution type of table from" + + " hash to random"); + } Catalog.getCurrentCatalog().convertDistributionType(db, olapTable); return; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 47a34b9554..bafafe71cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.PrimitiveType; @@ -91,9 +92,6 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("iceberg"); } - // for backup. set to -1 for normal use - private int tableSignature; - public CreateTableStmt() { // for persist tableName = new TableName(); @@ -164,7 +162,6 @@ public class CreateTableStmt extends DdlStmt { this.ifNotExists = ifNotExists; this.comment = Strings.nullToEmpty(comment); - this.tableSignature = -1; this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList; } @@ -240,14 +237,6 @@ public class CreateTableStmt extends DdlStmt { return tableName.getDb(); } - public void setTableSignature(int tableSignature) { - this.tableSignature = tableSignature; - } - - public int getTableSignature() { - return tableSignature; - } - public void setTableName(String newTableName) { tableName = new TableName(tableName.getDb(), newTableName); } @@ -427,6 +416,20 @@ public class CreateTableStmt extends DdlStmt { throw new AnalysisException("Create olap table should contain distribution desc"); } distributionDesc.analyze(columnSet, columnDefs); + if (distributionDesc.type == DistributionInfo.DistributionInfoType.RANDOM) { + if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) { + throw new AnalysisException("Create unique keys table should not contain random distribution desc"); + } else if (keysDesc.getKeysType() == KeysType.AGG_KEYS) { + for (ColumnDef columnDef : columnDefs) { + if (columnDef.getAggregateType() == AggregateType.REPLACE + || columnDef.getAggregateType() == AggregateType.REPLACE_IF_NOT_NULL) { + throw new AnalysisException("Create aggregate keys table with value columns of which" + + " aggregate type is " + columnDef.getAggregateType() + " should not contain random" + + " distribution desc"); + } + } + } + } } else if (engineName.equalsIgnoreCase("elasticsearch")) { EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 69832b139a..3ae89e10e2 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -7074,6 +7074,21 @@ public class Catalog { public void convertDistributionType(Database db, OlapTable tbl) throws DdlException { tbl.writeLockOrDdlException(); try { + if (tbl.isColocateTable()) { + throw new DdlException("Cannot change distribution type of colocate table."); + } + if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) { + throw new DdlException("Cannot change distribution type of unique keys table."); + } + if (tbl.getKeysType() == KeysType.AGG_KEYS) { + for (Column column : tbl.getBaseSchema()) { + if (column.getAggregationType() == AggregateType.REPLACE + || column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) { + throw new DdlException("Cannot change distribution type of aggregate keys table which has value" + + " columns with " + column.getAggregationType() + " type."); + } + } + } if (!tbl.convertHashDistributionToRandomDistribution()) { throw new DdlException("Table " + tbl.getName() + " is not hash distributed"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java index ef1712baa5..d228ceb3bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java @@ -18,11 +18,9 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.DistributionDesc; -import org.apache.doris.analysis.Expr; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang.NotImplementedException; @@ -30,8 +28,6 @@ import org.apache.commons.lang.NotImplementedException; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.List; -import java.util.Map; public abstract class DistributionInfo implements Writable { @@ -89,33 +85,4 @@ public abstract class DistributionInfo implements Writable { public boolean equals(DistributionInfo info) { return false; } - - public static List<Expr> toDistExpr(OlapTable tbl, DistributionInfo distInfo, Map<String, Expr> exprByCol) { - List<Expr> distExprs = Lists.newArrayList(); - if (distInfo instanceof RandomDistributionInfo) { - for (Column col : tbl.getBaseSchema()) { - if (col.isKey()) { - Expr distExpr = exprByCol.get(col.getName()); - // used to compute hash - if (col.getDataType() == PrimitiveType.CHAR) { - distExpr.setType(Type.CHAR); - } - distExprs.add(distExpr); - } else { - break; - } - } - } else if (distInfo instanceof HashDistributionInfo) { - HashDistributionInfo hashDistInfo = (HashDistributionInfo) distInfo; - for (Column col : hashDistInfo.getDistributionColumns()) { - Expr distExpr = exprByCol.get(col.getName()); - // used to compute hash - if (col.getDataType() == PrimitiveType.CHAR) { - distExpr.setType(Type.CHAR); - } - distExprs.add(distExpr); - } - } - return distExprs; - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 6d78eb84a0..72f3046c6e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -33,11 +33,13 @@ import org.apache.doris.backup.CatalogMocker; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.MaterializedIndex.IndexState; @@ -49,6 +51,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -64,9 +67,11 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; +import mockit.Expectations; +import mockit.Injectable; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -357,7 +362,7 @@ public class SchemaChangeJobV2Test { Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets()); } - public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey) + public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue) throws UserException { fakeCatalog = new FakeCatalog(); FakeCatalog.setCatalog(masterCatalog); @@ -378,11 +383,11 @@ public class SchemaChangeJobV2Test { @Test public void testModifyDynamicPartitionWithoutTableProperty() throws UserException { - modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT); - modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE); - modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE); - modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE); - modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false"); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day"); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3"); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p"); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30"); } @Test @@ -436,4 +441,64 @@ public class SchemaChangeJobV2Test { Partition partition1 = olapTable.getPartition(CatalogTestUtil.testPartitionId1); Assert.assertTrue(partition1.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM); } + + @Test + public void testAbnormalModifyTableDistributionType1(@Injectable OlapTable table) throws UserException { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get(); + new Expectations() { + { + table.isColocateTable(); + result = true; + } + }; + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of colocate table."); + Catalog.getCurrentCatalog().convertDistributionType(db, table); + } + + @Test + public void testAbnormalModifyTableDistributionType2(@Injectable OlapTable table) throws UserException { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get(); + new Expectations() { + { + table.isColocateTable(); + result = false; + table.getKeysType(); + result = KeysType.UNIQUE_KEYS; + } + }; + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of unique keys table."); + Catalog.getCurrentCatalog().convertDistributionType(db, table); + } + + @Test + public void testAbnormalModifyTableDistributionType3(@Injectable OlapTable table) throws UserException { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get(); + new Expectations() { + { + table.isColocateTable(); + result = false; + table.getKeysType(); + result = KeysType.AGG_KEYS; + table.getBaseSchema(); + result = Lists.newArrayList( + new Column("k1", Type.INT, true, null, "0", ""), + new Column("v1", Type.INT, false, AggregateType.REPLACE, "0", "")); + } + }; + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change " + + "distribution type of aggregate keys table which has value columns with REPLACE type."); + Catalog.getCurrentCatalog().convertDistributionType(db, table); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index e950d4f512..831f67acae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -28,7 +28,6 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.persist.EditLog; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -38,7 +37,6 @@ import com.google.common.collect.Maps; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -326,16 +324,4 @@ public class CatalogTestUtil { backend.setAlive(true); return backend; } - - public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort, - long totalCapacityB, long avaiLabelCapacityB) { - Backend backend = createBackend(id, host, heartPort, bePort, httpPort); - Map<String, TDisk> backendDisks = new HashMap<String, TDisk>(); - String rootPath = "root_path"; - TDisk disk = new TDisk(rootPath, totalCapacityB, avaiLabelCapacityB, true); - backendDisks.put(rootPath, disk); - backend.updateDisks(backendDisks); - backend.setAlive(true); - return backend; - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index 082a05cf40..f335e48c09 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -485,6 +485,36 @@ public class CreateTableTest { " \"dynamic_partition.start_day_of_month\" = \"3\"\n" + ");")); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, + "Create unique keys table should not contain random distribution desc", + () -> createTable("CREATE TABLE test.tbl21\n" + + "(\n" + + " `k1` bigint(20) NULL COMMENT \"\",\n" + + " `k2` largeint(40) NULL COMMENT \"\",\n" + + " `v1` varchar(204) NULL COMMENT \"\",\n" + + " `v2` smallint(6) NULL DEFAULT \"10\" COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "UNIQUE KEY(`k1`, `k2`)\n" + + "DISTRIBUTED BY RANDOM BUCKETS 32\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"\n" + + ");")); + + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, + "Create aggregate keys table with value columns of which aggregate type" + + " is REPLACE should not contain random distribution desc", + () -> createTable("CREATE TABLE test.tbl22\n" + + "(\n" + + " `k1` bigint(20) NULL COMMENT \"\",\n" + + " `k2` largeint(40) NULL COMMENT \"\",\n" + + " `v1` bigint(20) REPLACE NULL COMMENT \"\",\n" + + " `v2` smallint(6) REPLACE_IF_NOT_NULL NULL DEFAULT \"10\" COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "AGGREGATE KEY(`k1`, `k2`)\n" + + "DISTRIBUTED BY RANDOM BUCKETS 32\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"\n" + + ");")); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org