This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5f9eb5eb52d113e668dc0cd9f7bf2789b88c1b67 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Fri Mar 8 18:12:40 2024 +0800 [feature](external catalog)Add partition grammar for external catalog to create table (#31585) The `PARTITION BY` syntax used by external catalogs has been added. You can specify a column directly, or a partition function as a partition condition. Like: `PARTITION BY LIST(col1, col2, func(param), func(param1, param2), func(param1, param2, param3))` NOTICE: This PR change the grammar of `AUTO PARTITION` From ``` AUTO PARTITION BY RANGE date_trunc(`TIME_STAMP`, 'month') ``` To ``` AUTO PARTITION BY RANGE (date_trunc(`TIME_STAMP`, 'month')) ``` --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 17 +- .../apache/doris/analysis/ListPartitionDesc.java | 10 +- .../apache/doris/analysis/RangePartitionDesc.java | 4 + .../doris/nereids/parser/LogicalPlanBuilder.java | 66 ++--- .../doris/nereids/parser/PartitionTableInfo.java | 292 +++++++++++++++++++++ .../trees/plans/commands/CreateTableCommand.java | 5 + .../trees/plans/commands/info/CreateTableInfo.java | 222 ++-------------- .../trees/plans/CreateTableCommandTest.java | 131 +++++++++ .../test_auto_partition_behavior.groovy | 4 +- 9 files changed, 511 insertions(+), 240 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 8492b812a8d..133eea3ed7e 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -54,8 +54,7 @@ statementBase (ENGINE EQ engine=identifier)? ((AGGREGATE | UNIQUE | DUPLICATE) KEY keys=identifierList (CLUSTER BY clusterKeys=identifierList)?)? (COMMENT STRING_LITERAL)? - ((autoPartition=AUTO)? PARTITION BY (RANGE | LIST) (partitionKeys=identifierList | partitionExpr=functionCallExpression) - LEFT_PAREN (partitions=partitionsDef)? RIGHT_PAREN)? + (partition=partitionTable)? (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS (INTEGER_VALUE | autoBucket=AUTO))?)? (ROLLUP LEFT_PAREN rollupDefs RIGHT_PAREN)? properties=propertyClause? @@ -130,6 +129,19 @@ partitionSpec // | PARTITIONS WITH RECENT ; +partitionTable + : ((autoPartition=AUTO)? PARTITION BY (RANGE | LIST)? partitionList=identityOrFunctionList + (LEFT_PAREN (partitions=partitionsDef)? RIGHT_PAREN)) + ; + +identityOrFunctionList + : LEFT_PAREN identityOrFunction (COMMA partitions+=identityOrFunction)* RIGHT_PAREN + ; + +identityOrFunction + : (identifier | functionCallExpression) + ; + dataDesc : ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL (COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN INTO TABLE tableName=multipartIdentifier @@ -733,6 +745,7 @@ primaryExpression | primaryExpression COLLATE (identifier | STRING_LITERAL | DEFAULT) #collate ; + functionCallExpression : functionIdentifier LEFT_PAREN ( diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java index 0ca97ca9605..bf94b227d83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java @@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; // to describe the key list partition's information in create table stmt public class ListPartitionDesc extends PartitionDesc { @@ -37,6 +38,9 @@ public class ListPartitionDesc extends PartitionDesc { super(partitionColNames, allPartitionDescs); type = PartitionType.LIST; this.isAutoCreatePartitions = false; + this.partitionExprs = new ArrayList<>(partitionColNames.stream() + .map(col -> new SlotRef(null, col)) + .collect(Collectors.toList())); } public ListPartitionDesc(ArrayList<Expr> exprs, List<String> partitionColNames, @@ -68,12 +72,12 @@ public class ListPartitionDesc extends PartitionDesc { StringBuilder sb = new StringBuilder(); sb.append("PARTITION BY LIST("); int idx = 0; - for (String column : partitionColNames) { - if (idx != 0) { + for (Expr e : partitionExprs) { + if (idx > 0) { sb.append(", "); } - sb.append("`").append(column).append("`"); idx++; + sb.append(e.toSql()); } sb.append(")\n(\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java index 099e5b0b21d..57d696c37a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java @@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; // to describe the key range partition's information in create table stmt public class RangePartitionDesc extends PartitionDesc { @@ -35,6 +36,9 @@ public class RangePartitionDesc extends PartitionDesc { List<AllPartitionDesc> allPartitionDescs) throws AnalysisException { super(partitionColNames, allPartitionDescs); type = org.apache.doris.catalog.PartitionType.RANGE; + this.partitionExprs = new ArrayList<>(partitionColNames.stream() + .map(col -> new SlotRef(null, col)) + .collect(Collectors.toList())); this.isAutoCreatePartitions = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 42b9540a20d..de04612122c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -2435,33 +2435,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { // NOTICE: we should not generate immutable map here, because it will be modified when analyzing. ? Maps.newHashMap(visitPropertyClause(ctx.extProperties)) : Maps.newHashMap(); - String partitionType = null; - if (ctx.PARTITION() != null) { - partitionType = ctx.RANGE() != null ? "RANGE" : "LIST"; - } - boolean isAutoPartition = ctx.autoPartition != null; - ImmutableList.Builder<Expression> autoPartitionExpr = new ImmutableList.Builder<>(); - if (isAutoPartition) { - if (ctx.RANGE() != null) { - // AUTO PARTITION BY RANGE FUNC_CALL_EXPR - if (ctx.partitionExpr != null) { - autoPartitionExpr.add(visitFunctionCallExpression(ctx.partitionExpr)); - } else { - throw new AnalysisException( - "AUTO PARTITION BY RANGE must provide a function expr"); - } - } else { - // AUTO PARTITION BY LIST(`partition_col`) - if (ctx.partitionKeys != null) { - // only support one column in auto partition - autoPartitionExpr.addAll(visitIdentifierList(ctx.partitionKeys).stream() - .distinct().map(name -> UnboundSlot.quoted(name)) - .collect(Collectors.toList())); - } else { - throw new AnalysisException( - "AUTO PARTITION BY List must provide a partition column"); - } - } + + // solve partition by + PartitionTableInfo partitionInfo; + if (ctx.partition != null) { + partitionInfo = (PartitionTableInfo) ctx.partitionTable().accept(this); + } else { + partitionInfo = PartitionTableInfo.EMPTY; } if (ctx.columnDefs() != null) { @@ -2480,11 +2460,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { keysType, ctx.keys != null ? visitIdentifierList(ctx.keys) : ImmutableList.of(), comment, - isAutoPartition, - autoPartitionExpr.build(), - partitionType, - ctx.partitionKeys != null ? visitIdentifierList(ctx.partitionKeys) : null, - ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, + partitionInfo, desc, ctx.rollupDefs() != null ? visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(), properties, @@ -2502,11 +2478,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { keysType, ctx.keys != null ? visitIdentifierList(ctx.keys) : ImmutableList.of(), comment, - isAutoPartition, - autoPartitionExpr.build(), - partitionType, - ctx.partitionKeys != null ? visitIdentifierList(ctx.partitionKeys) : null, - ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, + partitionInfo, desc, ctx.rollupDefs() != null ? visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(), properties, @@ -2517,6 +2489,26 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { } } + @Override + public PartitionTableInfo visitPartitionTable(DorisParser.PartitionTableContext ctx) { + boolean isAutoPartition = ctx.autoPartition != null; + ImmutableList<Expression> partitionList = ctx.partitionList.identityOrFunction().stream() + .map(partition -> { + IdentifierContext identifier = partition.identifier(); + if (identifier != null) { + return UnboundSlot.quoted(identifier.getText()); + } else { + return visitFunctionCallExpression(partition.functionCallExpression()); + } + }) + .collect(ImmutableList.toImmutableList()); + return new PartitionTableInfo( + isAutoPartition, + ctx.RANGE() != null ? "RANGE" : "LIST", + ctx.partitions != null ? visitPartitionsDef(ctx.partitions) : null, + partitionList); + } + @Override public List<ColumnDefinition> visitColumnDefs(ColumnDefsContext ctx) { return ctx.cols.stream().map(this::visitColumnDef).collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/PartitionTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/PartitionTableInfo.java new file mode 100644 index 00000000000..33223a39ecc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/PartitionTableInfo.java @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.parser; + +import org.apache.doris.analysis.AllPartitionDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionParams; +import org.apache.doris.analysis.ListPartitionDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.RangePartitionDesc; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.nereids.analyzer.UnboundFunction; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.FixedRangePartition; +import org.apache.doris.nereids.trees.plans.commands.info.InPartition; +import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition; +import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.StepPartition; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * partition info for 'PARTITION BY' + */ +public class PartitionTableInfo { + + public static final PartitionTableInfo EMPTY = new PartitionTableInfo( + false, + PartitionType.UNPARTITIONED.name(), + null, + null); + + private boolean isAutoPartition; + private String partitionType; + private List<String> partitionColumns; + private List<PartitionDefinition> partitionDefs; + private List<Expression> partitionList; + + /** + * struct for partition definition + * + * @param isAutoPartition Whether it is an automatic partition + * @param partitionType partition type + * @param partitionFields partition fields + */ + public PartitionTableInfo( + boolean isAutoPartition, + String partitionType, + List<PartitionDefinition> partitionDefs, + List<Expression> partitionFields) { + this.isAutoPartition = isAutoPartition; + this.partitionType = partitionType; + this.partitionDefs = partitionDefs; + this.partitionList = partitionFields; + if (this.partitionList != null) { + this.partitionColumns = this.partitionList.stream() + .filter(UnboundSlot.class::isInstance) + .map(partition -> ((UnboundSlot) partition).getName()) + .collect(Collectors.toList()); + } + } + + public boolean isAutoPartition() { + return isAutoPartition; + } + + public String getPartitionType() { + return partitionType; + } + + public List<String> getPartitionColumns() { + return partitionColumns; + } + + /** + * check partitions types. + */ + private boolean checkPartitionsTypes() { + if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) { + if (partitionDefs.stream().allMatch( + p -> p instanceof StepPartition || p instanceof FixedRangePartition)) { + return true; + } + return partitionDefs.stream().allMatch( + p -> (p instanceof LessThanPartition) || (p instanceof FixedRangePartition)); + } + return partitionType.equalsIgnoreCase(PartitionType.LIST.name()) + && partitionDefs.stream().allMatch(p -> p instanceof InPartition); + } + + private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx, boolean isEnableMergeOnWrite) { + if (!column.isKey() + && (!column.getAggType().equals(AggregateType.NONE) || isEnableMergeOnWrite)) { + throw new AnalysisException("The partition column could not be aggregated column"); + } + if (column.getType().isFloatLikeType()) { + throw new AnalysisException("Floating point type column can not be partition column"); + } + if (column.getType().isStringType()) { + throw new AnalysisException("String Type should not be used in partition column[" + + column.getName() + "]."); + } + if (column.getType().isComplexType()) { + throw new AnalysisException("Complex type column can't be partition column: " + + column.getType().toString()); + } + // prohibit to create auto partition with null column anyhow + if (this.isAutoPartition && column.isNullable()) { + throw new AnalysisException("The auto partition column must be NOT NULL"); + } + if (!ctx.getSessionVariable().isAllowPartitionColumnNullable() && column.isNullable()) { + throw new AnalysisException( + "The partition column must be NOT NULL with allow_partition_column_nullable OFF"); + } + } + + /** + * Verify the relationship between partitions and columns + * + * @param columnMap column map of table + * @param properties properties of table + * @param ctx context + * @param isEnableMergeOnWrite whether enable merge on write + */ + public void validatePartitionInfo( + Map<String, ColumnDefinition> columnMap, + Map<String, String> properties, + ConnectContext ctx, + boolean isEnableMergeOnWrite, + boolean isExternal) { + + if (partitionColumns != null) { + + if (partitionColumns.size() != partitionList.size()) { + if (!isExternal && partitionType.equalsIgnoreCase(PartitionType.LIST.name())) { + throw new AnalysisException("internal catalog does not support functions in 'LIST' partition"); + } + isAutoPartition = true; + } + + partitionColumns.forEach(p -> { + if (!columnMap.containsKey(p)) { + throw new AnalysisException( + String.format("partition key %s is not exists", p)); + } + validatePartitionColumn(columnMap.get(p), ctx, isEnableMergeOnWrite); + }); + + Set<String> partitionColumnSets = Sets.newHashSet(); + List<String> duplicatesKeys = partitionColumns.stream() + .filter(c -> !partitionColumnSets.add(c)).collect(Collectors.toList()); + if (!duplicatesKeys.isEmpty()) { + throw new AnalysisException( + "Duplicated partition column " + duplicatesKeys.get(0)); + } + + if (partitionDefs != null) { + if (!checkPartitionsTypes()) { + throw new AnalysisException( + "partitions types is invalid, expected FIXED or LESS in range partitions" + + " and IN in list partitions"); + } + Set<String> partitionNames = Sets.newHashSet(); + for (PartitionDefinition partition : partitionDefs) { + if (partition instanceof StepPartition) { + continue; + } + String partitionName = partition.getPartitionName(); + if (partitionNames.contains(partitionName)) { + throw new AnalysisException( + "Duplicated named partition: " + partitionName); + } + partitionNames.add(partitionName); + } + partitionDefs.forEach(p -> { + p.setPartitionTypes(partitionColumns.stream() + .map(s -> columnMap.get(s).getType()).collect(Collectors.toList())); + p.validate(Maps.newHashMap(properties)); + }); + } + } + } + + /** + * Convert to PartitionDesc types. + */ + public PartitionDesc convertToPartitionDesc(boolean isExternal) { + PartitionDesc partitionDesc = null; + if (isExternal) { + isAutoPartition = true; + } + if (!partitionType.equalsIgnoreCase(PartitionType.UNPARTITIONED.name())) { + List<AllPartitionDesc> partitionDescs = + partitionDefs != null + ? partitionDefs.stream().map(PartitionDefinition::translateToCatalogStyle) + .collect(Collectors.toList()) + : null; + + int createTablePartitionMaxNum = ConnectContext.get().getSessionVariable().getCreateTablePartitionMaxNum(); + if (partitionDescs != null && partitionDescs.size() > createTablePartitionMaxNum) { + throw new org.apache.doris.nereids.exceptions.AnalysisException(String.format( + "The number of partitions to be created is [%s], exceeding the maximum value of [%s]. " + + "Creating too many partitions can be time-consuming. If necessary, " + + "You can set the session variable 'create_table_partition_max_num' " + + "to a larger value.", + partitionDescs.size(), createTablePartitionMaxNum)); + } + + try { + if (partitionType.equals(PartitionType.RANGE.name())) { + if (isAutoPartition) { + partitionDesc = new RangePartitionDesc( + convertToLegacyAutoPartitionExprs(partitionList), + partitionColumns, partitionDescs); + } else { + partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs); + } + } else { + if (isAutoPartition) { + partitionDesc = new ListPartitionDesc( + convertToLegacyAutoPartitionExprs(partitionList), + partitionColumns, partitionDescs); + } else { + partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs); + } + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + } + return partitionDesc; + } + + private static ArrayList<Expr> convertToLegacyAutoPartitionExprs(List<Expression> expressions) { + return new ArrayList<>(expressions.stream().map(expression -> { + if (expression instanceof UnboundSlot) { + return new SlotRef(null, ((UnboundSlot) expression).getName()); + } else if (expression instanceof UnboundFunction) { + UnboundFunction function = (UnboundFunction) expression; + return new FunctionCallExpr( + function.getName(), + new FunctionParams(convertToLegacyArguments(function.children()))); + } else { + throw new AnalysisException( + "unsupported auto partition expr " + expression.toString()); + } + }).collect(Collectors.toList())); + } + + private static List<Expr> convertToLegacyArguments(List<Expression> children) { + return children.stream().map(child -> { + if (child instanceof UnboundSlot) { + return new SlotRef(null, ((UnboundSlot) child).getName()); + } else if (child instanceof Literal) { + return new StringLiteral(((Literal) child).getStringValue()); + } else { + throw new AnalysisException("unsupported argument " + child.toString()); + } + }).collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index e821512113e..7606097411b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -184,4 +184,9 @@ public class CreateTableCommand extends Command implements ForwardWithSync { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitCreateTableCommand(this, context); } + + // for test + public CreateTableInfo getCreateTableInfo() { + return createTableInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 8ae6ee0da30..b67d1db27c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -17,27 +17,18 @@ package org.apache.doris.nereids.trees.plans.commands.info; -import org.apache.doris.analysis.AllPartitionDesc; import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DistributionDesc; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.FunctionParams; import org.apache.doris.analysis.IndexDef; import org.apache.doris.analysis.KeysDesc; -import org.apache.doris.analysis.ListPartitionDesc; import org.apache.doris.analysis.PartitionDesc; -import org.apache.doris.analysis.RangePartitionDesc; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.KeysType; -import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -52,13 +43,9 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.es.EsUtil; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.nereids.analyzer.UnboundFunction; -import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.parser.PartitionTableInfo; import org.apache.doris.nereids.types.DataType; -import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; @@ -69,7 +56,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -94,22 +80,17 @@ public class CreateTableInfo { private KeysType keysType; private List<String> keys; private final String comment; - private final String partitionType; - private List<String> partitionColumns; - private final List<PartitionDefinition> partitions; private DistributionDescriptor distribution; private final List<RollupDefinition> rollups; private Map<String, String> properties; private Map<String, String> extProperties; private boolean isEnableMergeOnWrite = false; - private final boolean isAutoPartition; - private final List<Expression> autoPartitionExprs; - private boolean isExternal = false; private String clusterName = null; private List<String> clusterKeysColumnNames = null; private List<Integer> clusterKeysColumnIds = null; + private PartitionTableInfo partitionTableInfo; /** * constructor for create table @@ -117,8 +98,7 @@ public class CreateTableInfo { public CreateTableInfo(boolean ifNotExists, boolean isExternal, String ctlName, String dbName, String tableName, List<ColumnDefinition> columns, List<IndexDefinition> indexes, String engineName, KeysType keysType, List<String> keys, String comment, - boolean isAutoPartition, List<Expression> autoPartitionExprs, String partitionType, - List<String> partitionColumns, List<PartitionDefinition> partitions, + PartitionTableInfo partitionTableInfo, DistributionDescriptor distribution, List<RollupDefinition> rollups, Map<String, String> properties, Map<String, String> extProperties, List<String> clusterKeyColumnNames) { @@ -134,11 +114,7 @@ public class CreateTableInfo { this.keysType = keysType; this.keys = Utils.copyRequiredList(keys); this.comment = comment; - this.isAutoPartition = isAutoPartition; - this.autoPartitionExprs = autoPartitionExprs; - this.partitionType = partitionType; - this.partitionColumns = partitionColumns; - this.partitions = partitions; + this.partitionTableInfo = partitionTableInfo; this.distribution = distribution; this.rollups = Utils.copyRequiredList(rollups); this.properties = properties; @@ -151,9 +127,8 @@ public class CreateTableInfo { */ public CreateTableInfo(boolean ifNotExists, boolean isExternal, String ctlName, String dbName, String tableName, List<String> cols, String engineName, KeysType keysType, - List<String> keys, String comment, boolean isAutoPartition, - List<Expression> autoPartitionExprs, String partitionType, - List<String> partitionColumns, List<PartitionDefinition> partitions, + List<String> keys, String comment, + PartitionTableInfo partitionTableInfo, DistributionDescriptor distribution, List<RollupDefinition> rollups, Map<String, String> properties, Map<String, String> extProperties, List<String> clusterKeyColumnNames) { @@ -169,11 +144,7 @@ public class CreateTableInfo { this.keysType = keysType; this.keys = Utils.copyRequiredList(keys); this.comment = comment; - this.isAutoPartition = isAutoPartition; - this.autoPartitionExprs = autoPartitionExprs; - this.partitionType = partitionType; - this.partitionColumns = partitionColumns; - this.partitions = partitions; + this.partitionTableInfo = partitionTableInfo; this.distribution = distribution; this.rollups = Utils.copyRequiredList(rollups); this.properties = properties; @@ -455,54 +426,8 @@ public class CreateTableInfo { } }); - if (isAutoPartition) { - partitionColumns = ExpressionUtils - .collectAll(autoPartitionExprs, UnboundSlot.class::isInstance).stream() - .map(slot -> ((UnboundSlot) slot).getName()).collect(Collectors.toList()); - } - - if (partitionColumns != null) { - partitionColumns.forEach(p -> { - if (!columnMap.containsKey(p)) { - throw new AnalysisException( - String.format("partition key %s is not exists", p)); - } - validatePartitionColumn(columnMap.get(p), ctx); - }); - - Set<String> partitionColumnSets = Sets.newHashSet(); - List<String> duplicatesKeys = partitionColumns.stream() - .filter(c -> !partitionColumnSets.add(c)).collect(Collectors.toList()); - if (!duplicatesKeys.isEmpty()) { - throw new AnalysisException( - "Duplicated partition column " + duplicatesKeys.get(0)); - } - - if (partitions != null) { - if (!checkPartitionsTypes()) { - throw new AnalysisException( - "partitions types is invalid, expected FIXED or LESS in range partitions" - + " and IN in list partitions"); - } - Set<String> partitionNames = Sets.newHashSet(); - for (PartitionDefinition partition : partitions) { - if (partition instanceof StepPartition) { - continue; - } - String partitionName = partition.getPartitionName(); - if (partitionNames.contains(partitionName)) { - throw new AnalysisException( - "Duplicated named partition: " + partitionName); - } - partitionNames.add(partitionName); - } - partitions.forEach(p -> { - p.setPartitionTypes(partitionColumns.stream() - .map(s -> columnMap.get(s).getType()).collect(Collectors.toList())); - p.validate(Maps.newHashMap(properties)); - }); - } - } + // validate partition + partitionTableInfo.validatePartitionInfo(columnMap, properties, ctx, isEnableMergeOnWrite, isExternal); // validate distribution descriptor distribution.updateCols(columns.get(0).getName()); @@ -532,6 +457,16 @@ public class CreateTableInfo { "Create " + engineName + " table should not contain keys desc"); } + if (!rollups.isEmpty()) { + throw new AnalysisException(engineName + " catalog doesn't support rollup tables."); + } + + if (engineName.equalsIgnoreCase("iceberg") && distribution != null) { + throw new AnalysisException( + "Iceberg doesn't support 'DISTRIBUTE BY', " + + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'."); + } + for (ColumnDefinition columnDef : columns) { columnDef.setIsKey(true); } @@ -599,22 +534,6 @@ public class CreateTableInfo { validate(ctx); } - /** - * check partitions types. - */ - private boolean checkPartitionsTypes() { - if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) { - if (partitions.stream().allMatch( - p -> p instanceof StepPartition || p instanceof FixedRangePartition)) { - return true; - } - return partitions.stream().allMatch( - p -> (p instanceof LessThanPartition) || (p instanceof FixedRangePartition)); - } - return partitionType.equalsIgnoreCase(PartitionType.LIST.name()) - && partitions.stream().allMatch(p -> p instanceof InPartition); - } - private void checkEngineName() { if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker") || engineName.equals("elasticsearch") || engineName.equals("hive") || engineName.equals("iceberg") @@ -643,32 +562,6 @@ public class CreateTableInfo { } } - private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx) { - if (!column.isKey() - && (!column.getAggType().equals(AggregateType.NONE) || isEnableMergeOnWrite)) { - throw new AnalysisException("The partition column could not be aggregated column"); - } - if (column.getType().isFloatLikeType()) { - throw new AnalysisException("Floating point type column can not be partition column"); - } - if (column.getType().isStringType()) { - throw new AnalysisException("String Type should not be used in partition column[" - + column.getName() + "]."); - } - if (column.getType().isComplexType()) { - throw new AnalysisException("Complex type column can't be partition column: " - + column.getType().toString()); - } - // prohibit to create auto partition with null column anyhow - if (this.isAutoPartition && column.isNullable()) { - throw new AnalysisException("The auto partition column must be NOT NULL"); - } - if (!ctx.getSessionVariable().isAllowPartitionColumnNullable() && column.isNullable()) { - throw new AnalysisException( - "The partition column must be NOT NULL with allow_partition_column_nullable OFF"); - } - } - // if auto bucket auto bucket enable, rewrite distribution bucket num && // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true" private static Map<String, String> maybeRewriteByAutoBucket( @@ -795,47 +688,7 @@ public class CreateTableInfo { * translate to catalog create table stmt */ public CreateTableStmt translateToLegacyStmt() { - PartitionDesc partitionDesc = null; - if (partitionColumns != null || isAutoPartition) { - List<AllPartitionDesc> partitionDescs = - partitions != null - ? partitions.stream().map(PartitionDefinition::translateToCatalogStyle) - .collect(Collectors.toList()) - : null; - - int createTablePartitionMaxNum = ConnectContext.get().getSessionVariable().getCreateTablePartitionMaxNum(); - if (partitionDescs != null && partitionDescs.size() > createTablePartitionMaxNum) { - throw new org.apache.doris.nereids.exceptions.AnalysisException(String.format( - "The number of partitions to be created is [%s], exceeding the maximum value of [%s]. " - + "Creating too many partitions can be time-consuming. If necessary, " - + "You can set the session variable 'create_table_partition_max_num' " - + "to a larger value.", - partitionDescs.size(), createTablePartitionMaxNum)); - } - - try { - if (partitionType.equals(PartitionType.RANGE.name())) { - if (isAutoPartition) { - partitionDesc = new RangePartitionDesc( - convertToLegacyAutoPartitionExprs(autoPartitionExprs), - partitionColumns, partitionDescs); - } else { - partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs); - } - } else { - if (isAutoPartition) { - partitionDesc = new ListPartitionDesc( - convertToLegacyAutoPartitionExprs(autoPartitionExprs), - partitionColumns, partitionDescs); - } else { - partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs); - } - } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e.getCause()); - } - } - + PartitionDesc partitionDesc = partitionTableInfo.convertToPartitionDesc(isExternal); List<AlterClause> addRollups = Lists.newArrayList(); if (!rollups.isEmpty()) { addRollups.addAll(rollups.stream().map(RollupDefinition::translateToCatalogStyle) @@ -859,9 +712,13 @@ public class CreateTableInfo { throw new AnalysisException(e.getMessage(), e.getCause()); } } else if (!engineName.equals("olap")) { - if (partitionDesc != null || distributionDesc != null) { + if (!engineName.equals("hive") && distributionDesc != null) { throw new AnalysisException("Create " + engineName - + " table should not contain partition or distribution desc"); + + " table should not contain distribution desc"); + } + if (!engineName.equals("hive") && !engineName.equals("iceberg") && partitionDesc != null) { + throw new AnalysisException("Create " + engineName + + " table should not contain partition desc"); } } @@ -872,31 +729,4 @@ public class CreateTableInfo { partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties, comment, addRollups, null); } - - private static ArrayList<Expr> convertToLegacyAutoPartitionExprs(List<Expression> expressions) { - return new ArrayList<>(expressions.stream().map(expression -> { - if (expression instanceof UnboundSlot) { - return new SlotRef(null, ((UnboundSlot) expression).getName()); - } else if (expression instanceof UnboundFunction) { - UnboundFunction function = (UnboundFunction) expression; - return new FunctionCallExpr(function.getName(), - new FunctionParams(convertToLegacyArguments(function.children()))); - } else { - throw new AnalysisException( - "unsupported auto partition expr " + expression.toString()); - } - }).collect(Collectors.toList())); - } - - private static List<Expr> convertToLegacyArguments(List<Expression> children) { - return children.stream().map(child -> { - if (child instanceof UnboundSlot) { - return new SlotRef(null, ((UnboundSlot) child).getName()); - } else if (child instanceof Literal) { - return new StringLiteral(((Literal) child).getStringValue()); - } else { - throw new AnalysisException("unsupported argument " + child.toString()); - } - }).collect(Collectors.toList()); - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java index 56ef7878954..b90d8ded6fd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/CreateTableCommandTest.java @@ -17,6 +17,11 @@ package org.apache.doris.nereids.trees.plans; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -32,6 +37,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.utframe.TestWithFeService; @@ -40,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import java.util.HashSet; +import java.util.List; import java.util.Set; public class CreateTableCommandTest extends TestWithFeService { @@ -701,4 +708,128 @@ public class CreateTableCommandTest extends TestWithFeService { Assertions.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, tb.getColumn("k3").getStrLen()); Assertions.assertEquals(10, tb.getColumn("k4").getStrLen()); } + + @Test + public void testCreateTablePartitionForExternalCatalog() { + + PartitionDesc par = null; + + par = getCreateTableStmt("create table tb1 (id int not null, id2 int not null, id3 int not null)" + + "partition by (id, id3) () distributed by hash (id) properties (\"a\"=\"b\")"); + Assertions.assertEquals("PARTITION BY LIST(`id`, `id3`)\n(\n\n)", par.toSql()); + + try { + getCreateTableStmt("create table tb1 (id int not null, id2 int not null, id3 int not null)" + + "partition by (id, func1(id2, 1), func(3,id1), id3) () " + + "distributed by hash (id) properties (\"a\"=\"b\")"); + } catch (Exception e) { + Assertions.assertEquals( + "internal catalog does not support functions in 'LIST' partition", + e.getMessage()); + } + + try { + getCreateTableStmt("create table tb1 (id int not null, id2 int not null, id3 int not null) " + + "ENGINE=iceberg partition by (id, func1(id2, 1), func(3,id1), id3) () " + + "distributed by hash (id) properties (\"a\"=\"b\")"); + } catch (Exception e) { + Assertions.assertEquals( + "Iceberg doesn't support 'DISTRIBUTE BY', " + + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'.", + e.getMessage()); + } + + par = getCreateTableStmt("create table tb1 (id int not null, id2 int not null, id3 int not null) " + + "ENGINE=iceberg partition by (id, func1(id2, 1), func(3,id1), id3) () properties (\"a\"=\"b\")"); + Assertions.assertEquals( + "PARTITION BY LIST(`id`, func1(`id2`, '1'), func('3', `id1`), `id3`)\n" + "(\n" + "\n" + ")", + par.toSql()); + + try { + getCreateTableStmt( + "create table tb1 (id int) " + + "engine = iceberg rollup (ab (cd)) properties (\"a\"=\"b\")"); + } catch (Exception e) { + Assertions.assertEquals( + "iceberg catalog doesn't support rollup tables.", + e.getMessage()); + } + + try { + getCreateTableStmt("create table tb1 (id int) engine = hive rollup (ab (cd)) properties (\"a\"=\"b\")"); + } catch (Exception e) { + Assertions.assertEquals( + "hive catalog doesn't support rollup tables.", + e.getMessage()); + } + + // test with empty partitions + LogicalPlan plan = new NereidsParser().parseSingle( + "create table tb1 (id int) engine = iceberg properties (\"a\"=\"b\")"); + Assertions.assertTrue(plan instanceof CreateTableCommand); + CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo(); + createTableInfo.validate(connectContext); + Assertions.assertNull(createTableInfo.translateToLegacyStmt().getPartitionDesc()); + + // test with multi partitions + LogicalPlan plan2 = new NereidsParser().parseSingle( + "create table tb1 (id int) engine = iceberg " + + "partition by (val, bucket(2, id), par, day(ts), efg(a,b,c)) () properties (\"a\"=\"b\")"); + Assertions.assertTrue(plan2 instanceof CreateTableCommand); + CreateTableInfo createTableInfo2 = ((CreateTableCommand) plan2).getCreateTableInfo(); + createTableInfo2.validate(connectContext); + PartitionDesc partitionDesc2 = createTableInfo2.translateToLegacyStmt().getPartitionDesc(); + List<Expr> partitionFields2 = partitionDesc2.getPartitionExprs(); + Assertions.assertEquals(5, partitionFields2.size()); + + Expr expr0 = partitionFields2.get(0); + Assertions.assertInstanceOf(SlotRef.class, expr0); + Assertions.assertEquals("val", expr0.getExprName()); + + Expr expr1 = partitionFields2.get(1); + Assertions.assertInstanceOf(FunctionCallExpr.class, expr1); + List<Expr> params1 = ((FunctionCallExpr) expr1).getParams().exprs(); + Assertions.assertEquals("bucket", expr1.getExprName()); + Assertions.assertEquals(2, params1.size()); + Assertions.assertInstanceOf(StringLiteral.class, params1.get(0)); + Assertions.assertEquals("2", params1.get(0).getStringValue()); + Assertions.assertInstanceOf(SlotRef.class, params1.get(1)); + Assertions.assertEquals("id", params1.get(1).getExprName()); + + Expr expr2 = partitionFields2.get(2); + Assertions.assertInstanceOf(SlotRef.class, expr2); + Assertions.assertEquals("par", expr2.getExprName()); + + Expr expr3 = partitionFields2.get(3); + Assertions.assertInstanceOf(FunctionCallExpr.class, expr3); + List<Expr> params3 = ((FunctionCallExpr) expr3).getParams().exprs(); + Assertions.assertEquals("day", expr3.getExprName()); + Assertions.assertEquals(1, params3.size()); + Assertions.assertInstanceOf(SlotRef.class, params3.get(0)); + Assertions.assertEquals("ts", params3.get(0).getExprName()); + + Expr expr4 = partitionFields2.get(4); + Assertions.assertInstanceOf(FunctionCallExpr.class, expr4); + List<Expr> params4 = ((FunctionCallExpr) expr4).getParams().exprs(); + Assertions.assertEquals("efg", expr4.getExprName()); + Assertions.assertEquals(3, params4.size()); + Assertions.assertInstanceOf(SlotRef.class, params4.get(0)); + Assertions.assertEquals("a", params4.get(0).getExprName()); + Assertions.assertInstanceOf(SlotRef.class, params4.get(1)); + Assertions.assertEquals("b", params4.get(1).getExprName()); + Assertions.assertInstanceOf(SlotRef.class, params4.get(2)); + Assertions.assertEquals("c", params4.get(2).getExprName()); + + Assertions.assertEquals( + "PARTITION BY LIST(`val`, bucket('2', `id`), `par`, day(`ts`), efg(`a`, `b`, `c`))\n(\n\n)", + partitionDesc2.toSql()); + } + + private PartitionDesc getCreateTableStmt(String sql) { + LogicalPlan plan = new NereidsParser().parseSingle(sql); + Assertions.assertTrue(plan instanceof CreateTableCommand); + CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo(); + createTableInfo.validate(connectContext); + return createTableInfo.translateToLegacyStmt().getPartitionDesc(); + } } diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy index a2429214d60..951f656f55f 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy @@ -344,7 +344,7 @@ suite("test_auto_partition_behavior") { k0 datetime(6) NOT null, k1 datetime(6) NOT null ) - auto partition by range date_trunc(k0, k1, 'hour') + auto partition by range (date_trunc(k0, k1, 'hour')) ( ) DISTRIBUTED BY HASH(`k0`) BUCKETS 2 @@ -359,7 +359,7 @@ suite("test_auto_partition_behavior") { k0 datetime(6) NOT null, k1 int NOT null ) - auto partition by range date_trunc(k1, 'hour') + auto partition by range (date_trunc(k1, 'hour')) ( ) DISTRIBUTED BY HASH(`k0`) BUCKETS 2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org