This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb6eb5ad7d [feature](Nereids) add command for updating mv with 
partitions (#28060)
ddb6eb5ad7d is described below

commit ddb6eb5ad7df74d3037d7514d5cf8b8f663b3e3b
Author: 谢健 <jianx...@gmail.com>
AuthorDate: Wed Dec 6 17:45:09 2023 +0800

    [feature](Nereids) add command for updating mv with partitions (#28060)
---
 .../java/org/apache/doris/catalog/TableIf.java     |   7 ++
 .../plans/commands/UpdateMvByPartitionCommand.java | 138 +++++++++++++++++++++
 2 files changed, 145 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index e21a4ee289f..eb472d8884f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.constraint.Constraint;
 import org.apache.doris.catalog.constraint.ForeignKeyConstraint;
 import org.apache.doris.catalog.constraint.PrimaryKeyConstraint;
 import org.apache.doris.catalog.constraint.UniqueConstraint;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -375,6 +376,12 @@ public interface TableIf {
         return null;
     }
 
+    default List<String> getFullQualifiers() {
+        return ImmutableList.of(getDatabase().getCatalog().getName(),
+                
ClusterNamespace.getNameFromFullName(getDatabase().getFullName()),
+                getName());
+    }
+
     default boolean isManagedTable() {
         return getType() == TableType.OLAP || getType() == 
TableType.MATERIALIZED_VIEW;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
new file mode 100644
index 00000000000..b221e507b82
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.InPredicate;
+import org.apache.doris.nereids.trees.expressions.LessThan;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Update mv by partition
+ */
+public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
+    private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) {
+        super(logicalQuery, Optional.empty());
+    }
+
+    /**
+     * Construct command
+     *
+     * @param mv materialize view
+     * @param partitions update partitions in mv and tables
+     * @param tableWithPartKey the partitions key for different table
+     * @return command
+     */
+    public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem> 
partitions,
+            Map<OlapTable, String> tableWithPartKey) {
+        NereidsParser parser = new NereidsParser();
+        Map<OlapTable, Set<Expression>> predicates =
+                constructTableWithPredicates(partitions, tableWithPartKey);
+        List<String> parts = constructPartsForMv(mv, partitions);
+        Plan plan = parser.parseSingle(mv.getQuerySql());
+        plan = plan.accept(new PredicateAdder(), predicates);
+        UnboundTableSink<? extends Plan> sink =
+                new UnboundTableSink<>(mv.getFullQualifiers(), 
ImmutableList.of(), ImmutableList.of(),
+                        parts, plan);
+        return new UpdateMvByPartitionCommand(sink);
+    }
+
+    private static List<String> constructPartsForMv(MTMV mv, 
Set<PartitionItem> partitions) {
+        return mv.getPartitionNames().stream()
+                .filter(name -> {
+                    PartitionItem mvPartItem = 
mv.getPartitionInfo().getItem(mv.getPartition(name).getId());
+                    return partitions.stream().anyMatch(p -> 
p.getIntersect(mvPartItem) != null);
+                })
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    private static Map<OlapTable, Set<Expression>> 
constructTableWithPredicates(Set<PartitionItem> partitions,
+            Map<OlapTable, String> tableWithPartKey) {
+        ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new 
ImmutableMap.Builder<>();
+        tableWithPartKey.forEach((table, colName) ->
+                builder.put(table, constructPredicates(partitions, colName))
+        );
+        return builder.build();
+    }
+
+    private static Set<Expression> constructPredicates(Set<PartitionItem> 
partitions, String colName) {
+        UnboundSlot slot = new UnboundSlot(colName);
+        return partitions.stream()
+                .map(item -> convertPartitionItemToPredicate(item, slot))
+                .collect(ImmutableSet.toImmutableSet());
+    }
+
+    private static Expression convertPartitionItemToPredicate(PartitionItem 
item, Slot col) {
+        if (item instanceof ListPartitionItem) {
+            List<Expression> inValues = ((ListPartitionItem) 
item).getItems().stream()
+                    .map(key -> Literal.fromLegacyLiteral(key.getKeys().get(0),
+                            Type.fromPrimitiveType(key.getTypes().get(0))))
+                    .collect(ImmutableList.toImmutableList());
+            return new InPredicate(col, inValues);
+        } else {
+            Range<PartitionKey> range = item.getItems();
+            List<Expression> exprs = new ArrayList<>();
+            if (range.hasLowerBound()) {
+                PartitionKey key = range.lowerEndpoint();
+                exprs.add(new GreaterThanEqual(col, 
Literal.fromLegacyLiteral(key.getKeys().get(0),
+                        Type.fromPrimitiveType(key.getTypes().get(0)))));
+            }
+            if (range.hasUpperBound()) {
+                PartitionKey key = range.upperEndpoint();
+                exprs.add(new LessThan(col, 
Literal.fromLegacyLiteral(key.getKeys().get(0),
+                        Type.fromPrimitiveType(key.getTypes().get(0)))));
+            }
+            return ExpressionUtils.and(exprs);
+        }
+    }
+
+    static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, 
Set<Expression>>> {
+        @Override
+        public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable, 
Set<Expression>> predicates) {
+            return new LogicalFilter<>(predicates.get(scan.getTable()), scan);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to