This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ddb6eb5ad7d [feature](Nereids) add command for updating mv with partitions (#28060) ddb6eb5ad7d is described below commit ddb6eb5ad7df74d3037d7514d5cf8b8f663b3e3b Author: 谢健 <jianx...@gmail.com> AuthorDate: Wed Dec 6 17:45:09 2023 +0800 [feature](Nereids) add command for updating mv with partitions (#28060) --- .../java/org/apache/doris/catalog/TableIf.java | 7 ++ .../plans/commands/UpdateMvByPartitionCommand.java | 138 +++++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index e21a4ee289f..eb472d8884f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.catalog.constraint.ForeignKeyConstraint; import org.apache.doris.catalog.constraint.PrimaryKeyConstraint; import org.apache.doris.catalog.constraint.UniqueConstraint; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -375,6 +376,12 @@ public interface TableIf { return null; } + default List<String> getFullQualifiers() { + return ImmutableList.of(getDatabase().getCatalog().getName(), + ClusterNamespace.getNameFromFullName(getDatabase().getFullName()), + getName()); + } + default boolean isManagedTable() { return getType() == TableType.OLAP || getType() == TableType.MATERIALIZED_VIEW; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java new file mode 100644 index 00000000000..b221e507b82 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Range; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Update mv by partition + */ +public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { + private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) { + super(logicalQuery, Optional.empty()); + } + + /** + * Construct command + * + * @param mv materialize view + * @param partitions update partitions in mv and tables + * @param tableWithPartKey the partitions key for different table + * @return command + */ + public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem> partitions, + Map<OlapTable, String> tableWithPartKey) { + NereidsParser parser = new NereidsParser(); + Map<OlapTable, Set<Expression>> predicates = + constructTableWithPredicates(partitions, tableWithPartKey); + List<String> parts = constructPartsForMv(mv, partitions); + Plan plan = parser.parseSingle(mv.getQuerySql()); + plan = plan.accept(new PredicateAdder(), predicates); + UnboundTableSink<? extends Plan> sink = + new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(), + parts, plan); + return new UpdateMvByPartitionCommand(sink); + } + + private static List<String> constructPartsForMv(MTMV mv, Set<PartitionItem> partitions) { + return mv.getPartitionNames().stream() + .filter(name -> { + PartitionItem mvPartItem = mv.getPartitionInfo().getItem(mv.getPartition(name).getId()); + return partitions.stream().anyMatch(p -> p.getIntersect(mvPartItem) != null); + }) + .collect(ImmutableList.toImmutableList()); + } + + private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(Set<PartitionItem> partitions, + Map<OlapTable, String> tableWithPartKey) { + ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new ImmutableMap.Builder<>(); + tableWithPartKey.forEach((table, colName) -> + builder.put(table, constructPredicates(partitions, colName)) + ); + return builder.build(); + } + + private static Set<Expression> constructPredicates(Set<PartitionItem> partitions, String colName) { + UnboundSlot slot = new UnboundSlot(colName); + return partitions.stream() + .map(item -> convertPartitionItemToPredicate(item, slot)) + .collect(ImmutableSet.toImmutableSet()); + } + + private static Expression convertPartitionItemToPredicate(PartitionItem item, Slot col) { + if (item instanceof ListPartitionItem) { + List<Expression> inValues = ((ListPartitionItem) item).getItems().stream() + .map(key -> Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0)))) + .collect(ImmutableList.toImmutableList()); + return new InPredicate(col, inValues); + } else { + Range<PartitionKey> range = item.getItems(); + List<Expression> exprs = new ArrayList<>(); + if (range.hasLowerBound()) { + PartitionKey key = range.lowerEndpoint(); + exprs.add(new GreaterThanEqual(col, Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0))))); + } + if (range.hasUpperBound()) { + PartitionKey key = range.upperEndpoint(); + exprs.add(new LessThan(col, Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0))))); + } + return ExpressionUtils.and(exprs); + } + } + + static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, Set<Expression>>> { + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable, Set<Expression>> predicates) { + return new LogicalFilter<>(predicates.get(scan.getTable()), scan); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org