This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4b3cbf0ec [feat](mtmv)external table support partition rewrite 
(#44998)
6b4b3cbf0ec is described below

commit 6b4b3cbf0ecc818909013db65030d7c645d719c7
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Thu Dec 5 14:15:59 2024 +0800

    [feat](mtmv)external table support partition rewrite (#44998)
    
    ### What problem does this PR solve?
    
    Previously, transparent rewriting of the external table could only be
    done as a whole or without rewriting.
    
    Now supports partial partition rewriting and direct lookup of the base
    table for some partitions.
    
    ### Release note
    
    mtmv partition rewrite support external table
---
 .../org/apache/doris/datasource/ExternalTable.java |  2 +-
 .../doris/datasource/hive/HMSExternalTable.java    |  2 +-
 .../maxcompute/MaxComputeExternalTable.java        |  2 +-
 .../mv/AbstractMaterializedViewRule.java           |  7 +-
 .../nereids/rules/exploration/mv/StructInfo.java   | 18 +++--
 .../plans/commands/UpdateMvByPartitionCommand.java | 16 ++--
 .../data/mtmv_p0/test_hive_rewrite_mtmv.out        | 31 ++++++++
 .../suites/mtmv_p0/test_hive_rewrite_mtmv.groovy   | 89 ++++++++++++++++++++++
 8 files changed, 148 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index bd1e36e7bc9..5451a219edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -399,7 +399,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
      * @param snapshot if not support mvcc, ignore this
      * @return partitionName ==> PartitionItem
      */
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         return Collections.emptyMap();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 134ad362fa1..2115f47d777 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -306,7 +306,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         return getNameToPartitionItems();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 0f748f59e92..dbbbcf2d6a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -92,7 +92,7 @@ public class MaxComputeExternalTable extends ExternalTable {
     }
 
     @Override
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         if (getPartitionColumns().isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 8e9ef1eaa97..e6f384502d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -461,17 +461,14 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
             return Pair.of(ImmutableMap.of(), ImmutableMap.of());
         }
         // Collect the mv related base table partitions which query used
-        Map<BaseTableInfo, Set<Partition>> queryUsedBaseTablePartitions = new 
LinkedHashMap<>();
+        Map<BaseTableInfo, Set<String>> queryUsedBaseTablePartitions = new 
LinkedHashMap<>();
         queryUsedBaseTablePartitions.put(relatedPartitionTable, new 
HashSet<>());
         queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), 
queryUsedBaseTablePartitions);
         // Bail out, not check invalid partition if not olap scan, support 
later
         if (queryUsedBaseTablePartitions.isEmpty()) {
             return Pair.of(ImmutableMap.of(), ImmutableMap.of());
         }
-        Set<String> queryUsedBaseTablePartitionNameSet = 
queryUsedBaseTablePartitions.get(relatedPartitionTable)
-                .stream()
-                .map(Partition::getName)
-                .collect(Collectors.toSet());
+        Set<String> queryUsedBaseTablePartitionNameSet = 
queryUsedBaseTablePartitions.get(relatedPartitionTable);
 
         Collection<Partition> mvValidPartitions = 
MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
                 cascadesContext.getConnectContext(), 
System.currentTimeMillis(), false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
index 5a84ab787d7..3de48dc7ff6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
@@ -17,9 +17,9 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
-import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.jobs.executor.Rewriter;
@@ -51,6 +51,8 @@ import 
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.
 import 
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -731,22 +733,28 @@ public class StructInfo {
      * Collect partitions on base table
      */
     public static class QueryScanPartitionsCollector extends 
DefaultPlanVisitor<Plan,
-            Map<BaseTableInfo, Set<Partition>>> {
+            Map<BaseTableInfo, Set<String>>> {
         @Override
         public Plan visitLogicalCatalogRelation(LogicalCatalogRelation 
catalogRelation,
-                Map<BaseTableInfo, Set<Partition>> targetTablePartitionMap) {
+                Map<BaseTableInfo, Set<String>> targetTablePartitionMap) {
             TableIf table = catalogRelation.getTable();
             BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
             if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
                 return catalogRelation;
             }
+            Set<String> tablePartitions = 
targetTablePartitionMap.get(relatedPartitionTable);
             if (catalogRelation instanceof LogicalOlapScan) {
                 // Handle olap table
                 LogicalOlapScan logicalOlapScan = (LogicalOlapScan) 
catalogRelation;
-                Set<Partition> tablePartitions = 
targetTablePartitionMap.get(relatedPartitionTable);
                 for (Long partitionId : 
logicalOlapScan.getSelectedPartitionIds()) {
-                    
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId));
+                    
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName());
                 }
+            } else if (catalogRelation instanceof LogicalFileScan
+                    && catalogRelation.getTable() instanceof ExternalTable
+                    && ((ExternalTable) 
catalogRelation.getTable()).supportInternalPartitionPruned()) {
+                LogicalFileScan logicalFileScan = (LogicalFileScan) 
catalogRelation;
+                SelectedPartitions selectedPartitions = 
logicalFileScan.getSelectedPartitions();
+                
tablePartitions.addAll(selectedPartitions.selectedPartitions.keySet());
             } else {
                 // todo Support other type partition table
                 // Not support to partition check now when query external 
catalog table, support later.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index 36cc0f95a77..869c2d0b38b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
@@ -306,16 +307,19 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
                     MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) 
table;
                     for (String partitionName : filterTableEntry.getValue()) {
                         Partition partition = 
targetTable.getPartition(partitionName);
-                        if (!(targetTable instanceof OlapTable)) {
-                            // check partition is have data or not, only 
support olap table
-                            break;
-                        }
-                        if (!((OlapTable) 
targetTable).selectNonEmptyPartitionIds(
+                        if (targetTable instanceof OlapTable && !((OlapTable) 
targetTable).selectNonEmptyPartitionIds(
                                 
Lists.newArrayList(partition.getId())).isEmpty()) {
-                            // Add filter only when partition has data
+                            // Add filter only when partition has data when 
olap table
                             partitionHasDataItems.add(
                                     ((OlapTable) 
targetTable).getPartitionInfo().getItem(partition.getId()));
                         }
+                        if (targetTable instanceof ExternalTable) {
+                            // Add filter only when partition has data when 
external table
+                            // TODO: 2024/12/4 real snapshot
+                            partitionHasDataItems.add(
+                                    ((ExternalTable) 
targetTable).getNameToPartitionItems(Optional.empty())
+                                            .get(partitionName));
+                        }
                     }
                     if (partitionHasDataItems.isEmpty()) {
                         predicates.setNeedAddFilter(false);
diff --git a/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out 
b/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out
new file mode 100644
index 00000000000..452cff71e53
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !refresh_one_partition --
+20230101       3
+
+-- !refresh_one_partition_rewrite --
+20230101       3
+20230102       3
+
+-- !refresh_complete --
+20230101       3
+20230102       3
+
+-- !refresh_all_partition_rewrite --
+20230101       3
+20230102       3
+
+-- !refresh_one_partition --
+20230101       3
+
+-- !refresh_one_partition_rewrite --
+20230101       3
+20230102       3
+
+-- !refresh_complete --
+20230101       3
+20230102       3
+
+-- !refresh_all_partition_rewrite --
+20230101       3
+20230102       3
+
diff --git a/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..f10d6bd65b4
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_rewrite_mtmv", 
"p0,external,hive,external_docker,external_docker_hive") {
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("diable Hive test.")
+        return;
+    }
+    String suiteName = "test_hive_rewrite_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_mv"
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+    String mvSql = "SELECT part_col,count(*) as num FROM 
${catalogName}.`default`.mtmv_base1 group by part_col;";
+    for (String hivePrefix : ["hive2", "hive3"]) {
+        String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        sql """drop catalog if exists ${catalogName}"""
+        sql """create catalog if not exists ${catalogName} properties (
+            "type"="hms",
+            'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+        );"""
+        sql """analyze table ${catalogName}.`default`.mtmv_base1 with sync"""
+        sql """alter table ${catalogName}.`default`.mtmv_base1 modify column 
part_col set stats ('row_count'='6');"""
+
+        sql """drop materialized view if exists ${mvName};"""
+        sql """
+            CREATE MATERIALIZED VIEW ${mvName}
+                BUILD DEFERRED REFRESH AUTO ON MANUAL
+                partition by(`part_col`)
+                DISTRIBUTED BY RANDOM BUCKETS 2
+                PROPERTIES ('replication_num' = '1')
+                AS
+                ${mvSql}
+            """
+        def showPartitionsResult = sql """show partitions from ${mvName}"""
+        logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+        assertTrue(showPartitionsResult.toString().contains("p_20230101"))
+        assertTrue(showPartitionsResult.toString().contains("p_20230102"))
+
+        // refresh one partitions
+        sql """
+                REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20230101);
+            """
+        waitingMTMVTaskFinishedByMvName(mvName)
+        order_qt_refresh_one_partition "SELECT * FROM ${mvName}"
+
+        def explainOnePartition = sql """ explain  ${mvSql} """
+        logger.info("explainOnePartition: " + explainOnePartition.toString())
+        assertTrue(explainOnePartition.toString().contains("VUNION"))
+        assertTrue(explainOnePartition.toString().contains("part_col[#4] = 
20230102"))
+        order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+        mv_rewrite_success("${mvSql}", "${mvName}")
+
+        //refresh complete
+        sql """
+                REFRESH MATERIALIZED VIEW ${mvName} complete
+            """
+        waitingMTMVTaskFinishedByMvName(mvName)
+        order_qt_refresh_complete "SELECT * FROM ${mvName}"
+
+        def explainAllPartition = sql """ explain  ${mvSql}; """
+        logger.info("explainAllPartition: " + explainAllPartition.toString())
+        assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+        assertTrue(explainAllPartition.toString().contains("partitions=2/2"))
+        order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+        mv_rewrite_success("${mvSql}", "${mvName}")
+
+        sql """drop materialized view if exists ${mvName};"""
+        sql """drop catalog if exists ${catalogName}"""
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to