This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a128c14d396 [fix](mtmv) Fix hudi materialized view union all rewritten
plan execute fail because of invalid slot (#58643)
a128c14d396 is described below
commit a128c14d396d10ac0e85624cb53f7c8cb28cbc85
Author: seawinde <[email protected]>
AuthorDate: Thu Dec 4 14:20:09 2025 +0800
[fix](mtmv) Fix hudi materialized view union all rewritten plan execute
fail because of invalid slot (#58643)
### What problem does this PR solve?
Related PR: #57558 #58413
Problem Summary:
This fix addresses the following three issues:
1. When invoking the method
org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan#withRelationId,
the output needs to be recalculated to meet expectations.
2. After compensating with a union all due to partial partition
invalidation of a materialized view, during the next round of
transparent rewriting, the rewriting for the child of the union
allshould use the query partitioncorresponding to the specific relation
id to prevent infinite loops.
3. Currently, in the `test_hudi_rewrite_mtmv` test, if the plan
rewritten by the materialized view transparent rewriting is not selected
by the CBO, it is difficult to troubleshoot because explain memo planis
not used. Therefore, the corresponding test method is modified.
---
.../mv/AbstractMaterializedViewRule.java | 2 +-
.../rules/exploration/mv/PartitionCompensator.java | 41 ++++-
.../trees/plans/logical/LogicalHudiScan.java | 2 +-
.../exploration/mv/PartitionCompensatorTest.java | 199 +++++++++++++++++++++
.../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 8 +-
5 files changed, 240 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 69232d6e261..036ffcd2251 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -316,7 +316,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo,
Set<String>>> invalidPartitions;
- if (PartitionCompensator.needUnionRewrite(materializationContext)
+ if (PartitionCompensator.needUnionRewrite(materializationContext,
cascadesContext.getStatementContext())
&& sessionVariable.isEnableMaterializedViewUnionRewrite())
{
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
Map<List<String>, Set<String>> queryUsedPartitions =
PartitionCompensator.getQueryUsedPartitions(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 22c3540b5cb..3fe966864d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
@@ -223,14 +225,38 @@ public class PartitionCompensator {
/**
* Check if need union compensate or not
+ * If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST, should not do union compensate
+ * because it means query all partitions from base table and prune
partition failed
*/
- public static boolean needUnionRewrite(MaterializationContext
materializationContext) {
+ public static boolean needUnionRewrite(MaterializationContext
materializationContext,
+ StatementContext statementContext)
throws AnalysisException {
if (!(materializationContext instanceof AsyncMaterializationContext)) {
return false;
}
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
PartitionType type = mtmv.getPartitionInfo().getType();
- List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
+ MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
+ List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
+ Set<MTMVRelatedTableIf> pctTables = mvPartitionInfo.getPctTables();
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap =
+ statementContext.getTableUsedPartitionNameMap();
+ for (MTMVRelatedTableIf pctTable : pctTables) {
+ if (pctTable instanceof ExternalTable && !((ExternalTable)
pctTable).supportInternalPartitionPruned()) {
+ // if pct table is external table and not support internal
partition pruned,
+ // we consider query all partitions from pct table, this would
cause loop union compensate,
+ // so we skip union compensate in this case
+ return false;
+ }
+ Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+ =
tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+ if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+ ||
tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) {
+ // If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST,
+ // should not do union compensate, because it means query all
partitions from base table
+ // and prune partition failed
+ return false;
+ }
+ }
return !PartitionType.UNPARTITIONED.equals(type) &&
!pctInfos.isEmpty();
}
@@ -238,11 +264,11 @@ public class PartitionCompensator {
* Get query used partitions
* this is calculated from tableUsedPartitionNameMap and tables in
statementContext
*
- * @param customRelationIdSet if union compensate occurs, the new query
used partitions is changed,
+ * @param currentUsedRelationIdSet if union compensate occurs, the new
query used partitions is changed,
* so need to get used partitions by relation id set
*/
public static Map<List<String>, Set<String>>
getQueryUsedPartitions(StatementContext statementContext,
- BitSet customRelationIdSet) {
+ BitSet currentUsedRelationIdSet) {
// get table used partitions
// if table is not in statementContext().getTables() which means the
table is partition prune as empty relation
Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = statementContext
@@ -267,6 +293,13 @@ public class PartitionCompensator {
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
null);
continue tableLoop;
}
+ // If currentUsedRelationIdSet is not empty, need check
relation id to get concrete used partitions
+ BitSet usedPartitionRelation = new BitSet();
+
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+ if (!currentUsedRelationIdSet.isEmpty()
+ &&
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+ continue;
+ }
usedPartitionSet.addAll(tableUsedPartitionPair.value());
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
usedPartitionSet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 0e10bb79920..ca380dbdddd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -183,7 +183,7 @@ public class LogicalHudiScan extends LogicalFileScan {
public LogicalHudiScan withRelationId(RelationId relationId) {
return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier,
selectedPartitions, tableSample, tableSnapshot, scanParams,
incrementalRelation,
- operativeSlots, virtualColumns, groupExpression,
Optional.of(getLogicalProperties()),
+ operativeSlots, virtualColumns, groupExpression,
Optional.empty(),
tableAlias, cachedOutputs);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 672628e7d1e..17d75f93fcf 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,21 +17,39 @@
package org.apache.doris.nereids.rules.exploration.mv;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseColInfo;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class PartitionCompensatorTest extends TestWithFeService {
@@ -191,4 +209,185 @@ public class PartitionCompensatorTest extends
TestWithFeService {
Assertions.assertEquals(orderTableUsedPartition,
ImmutableSet.of("p1", "p2", "p3", "p4"));
});
}
+
+ @Test
+ public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws
Exception {
+ MaterializationContext ctx1 = mockCtx(
+ PartitionType.UNPARTITIONED,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(),
+ false);
+ StatementContext sc1 = Mockito.mock(StatementContext.class);
+
Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1,
sc1));
+
+ MaterializationContext ctx2 = mockCtx(
+ PartitionType.RANGE,
+ Collections.emptyList(),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc2 = Mockito.mock(StatementContext.class);
+
Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2,
sc2));
+ }
+
+ @Test
+ public void testNeedUnionRewriteEmptyPctTables() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.RANGE,
+ ImmutableList.of(),
+ Collections.emptySet(),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ true);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNeedUnionRewritePositive() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+ BaseTableInfo tableInfo = newBaseTableInfo();
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", tableInfo)),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
+ ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t =
ArrayListMultimap.create();
+ t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+ Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsAllAndPartial() {
+ // Prepare qualifiers
+ List<String> lineitemQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ List<String> ordersQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"orders_list_partition");
+
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+
+ tableUsedPartitionNameMap.put(lineitemQualifier,
PartitionCompensator.ALL_PARTITIONS);
+
+ RelationId ridA = new RelationId(1);
+ RelationId ridB = new RelationId(2);
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA,
ImmutableSet.of("p1", "p2")));
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB,
ImmutableSet.of("p3")));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"),
result.get(ordersQualifier));
+
+ BitSet filterRidA = new BitSet();
+ filterRidA.set(ridA.asInt());
+ Map<List<String>, Set<String>> resultRidA =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidA);
+ Assertions.assertNull(resultRidA.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2"),
resultRidA.get(ordersQualifier));
+
+ BitSet filterRidB = new BitSet();
+ filterRidB.set(ridB.asInt());
+ Map<List<String>, Set<String>> resultRidB =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidB);
+ Assertions.assertNull(resultRidB.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p3"),
resultRidB.get(ordersQualifier));
+
+ tableUsedPartitionNameMap.put(ordersQualifier,
PartitionCompensator.ALL_PARTITIONS);
+ Map<List<String>, Set<String>> resultAllOrders =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+ List<String> qualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+ // Put an empty set via a distinct relation id to simulate no
partitions used
+ RelationId rid = new RelationId(3);
+ tableUsedPartitionNameMap.put(qualifier, Pair.of(rid,
ImmutableSet.of()));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+ }
+
+ private static MaterializationContext mockCtx(
+ PartitionType type,
+ List<BaseColInfo> pctInfos,
+ Set<MTMVRelatedTableIf> pctTables,
+ boolean externalNoPrune) throws AnalysisException {
+
+ MTMV mtmv = Mockito.mock(MTMV.class);
+ PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+ Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+ Mockito.when(pi.getType()).thenReturn(type);
+
+ MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+ Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+ Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos);
+ Mockito.when(mpi.getPctTables()).thenReturn(pctTables);
+
+ if (externalNoPrune) {
+ HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+ Set<TableIf> tbls = new HashSet<>(pctTables);
+ tbls.add(ext);
+ Mockito.when(mpi.getPctTables()).thenReturn(
+
tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet()));
+ }
+
+ AsyncMaterializationContext ctx =
Mockito.mock(AsyncMaterializationContext.class);
+ Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+ return ctx;
+ }
+
+ private static BaseTableInfo newBaseTableInfo() {
+ CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+ Mockito.when(catalog.getId()).thenReturn(1L);
+ Mockito.when(catalog.getName()).thenReturn("internal");
+
+ DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+ Mockito.when(db.getId()).thenReturn(2L);
+ Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+ Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+ TableIf table = Mockito.mock(TableIf.class);
+ Mockito.when(table.getId()).thenReturn(3L);
+ Mockito.when(table.getName()).thenReturn("t");
+ Mockito.when(table.getDatabase()).thenReturn(db);
+
+ return new BaseTableInfo(table);
+ }
}
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
---
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
- def explainOnePartition = sql """ explain ${mvSql} """
- logger.info("explainOnePartition: " + explainOnePartition.toString())
- assertTrue(explainOnePartition.toString().contains("VUNION"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_one_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_auto "SELECT * FROM ${mvName} "
- def explainAllPartition = sql """ explain ${mvSql}; """
- logger.info("explainAllPartition: " + explainAllPartition.toString())
- assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_all_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]