This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5275202c176 [enhance](mtmv) mtmv disable hive auto refresh (#30775) 5275202c176 is described below commit 5275202c1760e6587dde5ff27e0faf24123388f9 Author: zhangdong <493738...@qq.com> AuthorDate: Mon Feb 5 11:40:10 2024 +0800 [enhance](mtmv) mtmv disable hive auto refresh (#30775) - If the `related table` is `hive`, do not refresh automatically - If the `related table` is `hive`, the partition col is allowed to be `null`. Otherwise, it must be `not null` - add more `ut` --- .../java/org/apache/doris/catalog/OlapTable.java | 9 + .../doris/catalog/external/HMSExternalTable.java | 10 + .../doris/common/proc/PartitionsProcDir.java | 4 +- .../apache/doris/common/proc/TablesProcDir.java | 3 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 36 +- .../mtmv/{MTMVUtil.java => MTMVPartitionUtil.java} | 105 +---- .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 15 + .../org/apache/doris/mtmv/MTMVRelationManager.java | 3 +- .../org/apache/doris/mtmv/MTMVRewriteUtil.java | 87 ++++ .../java/org/apache/doris/mtmv/MTMVService.java | 4 +- .../main/java/org/apache/doris/mtmv/MTMVUtil.java | 452 +-------------------- .../mv/AbstractMaterializedViewRule.java | 6 +- .../exploration/mv/MaterializedViewUtils.java | 2 +- .../trees/plans/commands/info/RefreshMTMVInfo.java | 4 +- .../doris/tablefunction/MetadataGenerator.java | 4 +- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 187 +++++++++ .../apache/doris/mtmv/MTMVRefreshSnapshotTest.java | 96 +++++ .../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 254 ++++++++++++ .../java/org/apache/doris/mtmv/MTMVTaskTest.java | 163 ++++++++ .../exploration/mv/MaterializedViewUtilsTest.java | 2 - regression-test/data/mtmv_p0/test_hive_mtmv.out | 5 + .../suites/mtmv_p0/test_hive_mtmv.groovy | 8 + 22 files changed, 892 insertions(+), 567 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2e01f37639b..0c0ce320f11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2632,4 +2632,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return getPartitionOrAnalysisException(partitionId).getName(); } + @Override + public boolean needAutoRefresh() { + return true; + } + + @Override + public boolean isPartitionColumnAllowNull() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index aa4258baadb..c31ba11a0fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -885,6 +885,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI ((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive()); return partitionValuesList; } + + @Override + public boolean needAutoRefresh() { + return false; + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 01feaf23683..4703429fa18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -45,7 +45,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -309,7 +309,7 @@ public class PartitionsProcDir implements ProcDirInterface { partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); if (olapTable instanceof MTMV) { try { - List<String> partitionUnSyncTables = MTMVUtil + List<String> partitionUnSyncTables = MTMVPartitionUtil .getPartitionUnSyncTables((MTMV) olapTable, partitionId); partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); partitionInfo.add(partitionUnSyncTables.toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java index b3ce9be35c9..c2926c829ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.ListComparator; @@ -91,7 +90,7 @@ public class TablesProcDir implements ProcDirInterface { String partitionKey = FeConstants.null_string; table.readLock(); try { - if (table.getType() == TableType.OLAP) { + if (table instanceof OlapTable) { OlapTable olapTable = (OlapTable) table; if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) { partitionNum = olapTable.getPartitions().size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 194172a6732..6a861200e69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -18,17 +18,14 @@ package org.apache.doris.job.extensions.mtmv; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; @@ -36,6 +33,7 @@ import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; @@ -145,6 +143,13 @@ public class MTMVTask extends AbstractTask { this.taskContext = Objects.requireNonNull(taskContext); } + // only for test + public MTMVTask(MTMV mtmv, MTMVRelation relation, MTMVTaskContext taskContext) { + this.mtmv = mtmv; + this.relation = relation; + this.taskContext = taskContext; + } + @Override public void run() throws JobException { LOG.info("mtmv task run, taskId: {}", super.getTaskId()); @@ -161,10 +166,10 @@ public class MTMVTask extends AbstractTask { // To be completely consistent with hive, you need to manually refresh the cache // refreshHmsTable(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); } List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(); - this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); + this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; @@ -181,8 +186,8 @@ public class MTMVTask extends AbstractTask { Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); // need get names before exec - List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); - Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVUtil + List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds); exec(ctx, execPartitionIds, tableWithPartKey); completedPartitions.addAll(execPartitionNames); @@ -241,7 +246,7 @@ public class MTMVTask extends AbstractTask { LOG.info("mtmv task before, taskId: {}", super.getTaskId()); super.before(); try { - mtmv = getMTMV(); + mtmv = MTMVUtil.getMTMV(dbId, mtmvId); } catch (UserException e) { LOG.warn("before task failed:", e); throw new JobException(e); @@ -267,11 +272,6 @@ public class MTMVTask extends AbstractTask { } } - private MTMV getMTMV() throws DdlException, MetaNotFoundException { - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); - return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); - } - @Override public void runTask() throws JobException { LOG.info("mtmv task runTask, taskId: {}", super.getTaskId()); @@ -296,7 +296,7 @@ public class MTMVTask extends AbstractTask { String dbName = ""; String mvName = ""; try { - MTMV mtmv = getMTMV(); + MTMV mtmv = MTMVUtil.getMTMV(dbId, mtmvId); dbName = mtmv.getQualifiedDbName(); mvName = mtmv.getName(); } catch (UserException e) { @@ -386,20 +386,20 @@ public class MTMVTask extends AbstractTask { } } - private List<Long> calculateNeedRefreshPartitions() throws AnalysisException { + public List<Long> calculateNeedRefreshPartitions() throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { if (taskContext.isComplete()) { return mtmv.getPartitionIds(); } else if (!CollectionUtils .isEmpty(taskContext.getPartitions())) { - return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); + return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); } } // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); + boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); if (fresh) { return Lists.newArrayList(); } @@ -413,7 +413,7 @@ public class MTMVTask extends AbstractTask { } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables()); + return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables()); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java similarity index 83% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java copy to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 74593e5def5..a8e3e11869d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -29,11 +29,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; -import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -48,23 +44,8 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; -public class MTMVUtil { - private static final Logger LOG = LogManager.getLogger(MTMVUtil.class); - - /** - * get Table by BaseTableInfo - * - * @param baseTableInfo - * @return - * @throws AnalysisException - */ - public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException { - TableIf table = Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) - .getDbOrAnalysisException(baseTableInfo.getDbId()) - .getTableOrAnalysisException(baseTableInfo.getTableId()); - return table; - } +public class MTMVPartitionUtil { + private static final Logger LOG = LogManager.getLogger(MTMVPartitionUtil.class); /** * Determine whether the partition is sync with retated partition and other baseTables @@ -76,7 +57,7 @@ public class MTMVUtil { * @return * @throws AnalysisException */ - private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, + public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { boolean isSyncWithPartition = true; if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { @@ -192,11 +173,14 @@ public class MTMVUtil { public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { List<String> res = Lists.newArrayList(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { - TableIf table = getTable(baseTableInfo); + TableIf table = MTMVUtil.getTable(baseTableInfo); if (!(table instanceof MTMVRelatedTableIf)) { continue; } MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table; + if (!mtmvRelatedTableIf.needAutoRefresh()) { + continue; + } if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); @@ -220,55 +204,6 @@ public class MTMVUtil { return res; } - /** - * Determine which partition of mtmv can be rewritten - * - * @param mtmv - * @param ctx - * @return - */ - public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) { - List<Partition> res = Lists.newArrayList(); - Collection<Partition> allPartitions = mtmv.getPartitions(); - // check session variable if enable rewrite - if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) { - return res; - } - if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable() - .isMaterializedViewRewriteEnableContainExternalTable()) { - return res; - } - - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return res; - } - // check mv is normal - if (!(mtmv.getStatus().getState() == MTMVState.NORMAL - && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { - return res; - } - // check gracePeriod - long gracePeriodMills = mtmv.getGracePeriod(); - long currentTimeMills = System.currentTimeMillis(); - for (Partition partition : allPartitions) { - if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() - + gracePeriodMills)) { - res.add(partition); - continue; - } - try { - if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) { - res.add(partition); - } - } catch (AnalysisException e) { - // ignore it - LOG.warn("check isMTMVPartitionSync failed", e); - } - } - return res; - } - /** * Get the partitions that need to be refreshed * @@ -303,9 +238,12 @@ public class MTMVUtil { * @return * @throws AnalysisException */ - private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, + public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, MTMVRelatedTableIf relatedTable, Long relatedPartitionId) throws AnalysisException { + if (!relatedTable.needAutoRefresh()) { + return true; + } MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable .getPartitionSnapshot(relatedPartitionId); String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); @@ -320,7 +258,7 @@ public class MTMVUtil { * @param desc * @return */ - private static String generatePartitionName(PartitionKeyDesc desc) { + public static String generatePartitionName(PartitionKeyDesc desc) { String partitionName = "p_"; partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "") .replaceAll("\\(|\\)|\\,|\\[|\\]", "_"); @@ -401,7 +339,7 @@ public class MTMVUtil { for (BaseTableInfo baseTableInfo : tables) { TableIf table = null; try { - table = getTable(baseTableInfo); + table = MTMVUtil.getTable(baseTableInfo); } catch (AnalysisException e) { LOG.warn("get table failed, {}", baseTableInfo, e); return false; @@ -421,7 +359,7 @@ public class MTMVUtil { throws AnalysisException { TableIf table = null; try { - table = getTable(baseTableInfo); + table = MTMVUtil.getTable(baseTableInfo); } catch (AnalysisException e) { LOG.warn("get table failed, {}", baseTableInfo, e); return false; @@ -433,22 +371,15 @@ public class MTMVUtil { return true; } MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table; + if (!baseTable.needAutoRefresh()) { + return true; + } MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); } - private static boolean mtmvContainsExternalTable(MTMV mtmv) { - Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables(); - for (BaseTableInfo baseTableInfo : baseTables) { - if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) { - return true; - } - } - return false; - } - public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, Set<BaseTableInfo> baseTables, Set<Long> partitionIds) throws AnalysisException { @@ -482,7 +413,7 @@ public class MTMVUtil { .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { continue; } - TableIf table = getTable(baseTableInfo); + TableIf table = MTMVUtil.getTable(baseTableInfo); if (!(table instanceof MTMVRelatedTableIf)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 51773db0df1..46454679b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -87,4 +87,19 @@ public interface MTMVRelatedTableIf extends TableIf { * @throws AnalysisException */ String getPartitionName(long partitionId) throws AnalysisException; + + /** + * Does the current type of table allow timed triggering + * + * @return If return false,The method of comparing whether to synchronize will directly return true, + * otherwise the snapshot information will be compared + */ + boolean needAutoRefresh(); + + /** + * if allow partition column `isAllowNull` + * + * @return + */ + boolean isPartitionColumnAllowNull(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 77414e4fc88..aa7ffd2426d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -71,7 +71,8 @@ public class MTMVRelationManager implements MTMVHookService { for (BaseTableInfo tableInfo : mvInfos) { try { MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo); - if (!CollectionUtils.isEmpty(MTMVUtil.getMTMVCanRewritePartitions(mtmv, ctx))) { + if (!CollectionUtils + .isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, System.currentTimeMillis()))) { res.add(mtmv); } } catch (AnalysisException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java new file mode 100644 index 00000000000..666a79eba97 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.List; + +public class MTMVRewriteUtil { + private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class); + + /** + * Determine which partition of mtmv can be rewritten + * + * @param mtmv + * @param ctx + * @return + */ + public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx, + long currentTimeMills) { + List<Partition> res = Lists.newArrayList(); + Collection<Partition> allPartitions = mtmv.getPartitions(); + // check session variable if enable rewrite + if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) { + return res; + } + if (MTMVUtil.mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable() + .isMaterializedViewRewriteEnableContainExternalTable()) { + return res; + } + + MTMVRelation mtmvRelation = mtmv.getRelation(); + if (mtmvRelation == null) { + return res; + } + // check mv is normal + if (!(mtmv.getStatus().getState() == MTMVState.NORMAL + && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { + return res; + } + // check gracePeriod + long gracePeriodMills = mtmv.getGracePeriod(); + for (Partition partition : allPartitions) { + if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() + + gracePeriodMills)) { + res.add(partition); + continue; + } + try { + if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), + Sets.newHashSet())) { + res.add(partition); + } + } catch (AnalysisException e) { + // ignore it + LOG.warn("check isMTMVPartitionSync failed", e); + } + } + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 6abb22f3e5f..227166e56d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -84,10 +84,10 @@ public class MTMVService { public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException { Objects.requireNonNull(mtmv); + LOG.info("createMTMV: " + mtmv.getName()); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); } - LOG.info("createMTMV: " + mtmv.getName()); for (MTMVHookService mtmvHookService : hooks.values()) { mtmvHookService.createMTMV(mtmv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 74593e5def5..3b97e35141d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -17,39 +17,19 @@ package org.apache.doris.mtmv; -import org.apache.doris.analysis.AddPartitionClause; -import org.apache.doris.analysis.DropPartitionClause; -import org.apache.doris.analysis.PartitionKeyDesc; -import org.apache.doris.analysis.SinglePartitionDesc; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; -import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; public class MTMVUtil { - private static final Logger LOG = LogManager.getLogger(MTMVUtil.class); /** * get Table by BaseTableInfo @@ -66,380 +46,18 @@ public class MTMVUtil { return table; } - /** - * Determine whether the partition is sync with retated partition and other baseTables - * - * @param mtmv - * @param partitionId - * @param tables - * @param excludedTriggerTables - * @return - * @throws AnalysisException - */ - private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, - Set<String> excludedTriggerTables) throws AnalysisException { - boolean isSyncWithPartition = true; - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - // if follow base table, not need compare with related table, only should compare with related partition - excludedTriggerTables.add(relatedTable.getName()); - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { - LOG.warn("can not found related partition: " + partitionId); - return false; - } - isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); - } - return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); - - } - - /** - * Align the partitions of mtmv and related tables, delete more and add less - * - * @param mtmv - * @param relatedTable - * @throws DdlException - * @throws AnalysisException - */ - public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable) - throws DdlException, AnalysisException { - Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems(); - Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems(); - // drop partition of mtmv - for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); - if (partitionId == -1L) { - dropPartition(mtmv, entry.getKey()); - } - } - // add partition for mtmv - for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), mtmvItems); - if (partitionId == -1L) { - addPartition(mtmv, entry.getValue()); - } - } - } - - public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException { - List<String> res = Lists.newArrayList(); - for (Long partitionId : ids) { - res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); - } - return res; - } - - public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException { - List<Long> res = Lists.newArrayList(); - for (String partitionName : partitions) { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); - res.add(partition.getId()); - } - return res; - } - - /** - * check if table is sync with all baseTables - * - * @param mtmv - * @return - */ - public static boolean isMTMVSync(MTMV mtmv) { - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return false; - } - try { - return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet()); - } catch (AnalysisException e) { - LOG.warn("isMTMVSync failed: ", e); - return false; - } - } - - /** - * Determine whether the mtmv is sync with tables - * - * @param mtmv - * @param tables - * @param excludeTables - * @return - * @throws AnalysisException - */ - public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables) - throws AnalysisException { - Collection<Partition> partitions = mtmv.getPartitions(); - for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) { - return false; - } - } - return true; + public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotFoundException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); + return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); } /** - * get not sync tables + * if base tables of mtmv contains external table * * @param mtmv - * @param partitionId * @return - * @throws AnalysisException */ - public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { - List<String> res = Lists.newArrayList(); - for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { - TableIf table = getTable(baseTableInfo); - if (!(table instanceof MTMVRelatedTableIf)) { - continue; - } - MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table; - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv - .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { - throw new AnalysisException("can not found related partition"); - } - boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf, - relatedPartitionId); - if (!isSyncWithPartition) { - res.add(mtmvRelatedTableIf.getName()); - } - } else { - if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) { - res.add(table.getName()); - } - } - } - return res; - } - - /** - * Determine which partition of mtmv can be rewritten - * - * @param mtmv - * @param ctx - * @return - */ - public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) { - List<Partition> res = Lists.newArrayList(); - Collection<Partition> allPartitions = mtmv.getPartitions(); - // check session variable if enable rewrite - if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) { - return res; - } - if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable() - .isMaterializedViewRewriteEnableContainExternalTable()) { - return res; - } - - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return res; - } - // check mv is normal - if (!(mtmv.getStatus().getState() == MTMVState.NORMAL - && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { - return res; - } - // check gracePeriod - long gracePeriodMills = mtmv.getGracePeriod(); - long currentTimeMills = System.currentTimeMillis(); - for (Partition partition : allPartitions) { - if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() - + gracePeriodMills)) { - res.add(partition); - continue; - } - try { - if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) { - res.add(partition); - } - } catch (AnalysisException e) { - // ignore it - LOG.warn("check isMTMVPartitionSync failed", e); - } - } - return res; - } - - /** - * Get the partitions that need to be refreshed - * - * @param mtmv - * @param baseTables - * @return - */ - public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) { - Collection<Partition> allPartitions = mtmv.getPartitions(); - List<Long> res = Lists.newArrayList(); - for (Partition partition : allPartitions) { - try { - if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables, - mtmv.getExcludedTriggerTables())) { - res.add(partition.getId()); - } - } catch (AnalysisException e) { - res.add(partition.getId()); - LOG.warn("check isMTMVPartitionSync failed", e); - } - } - return res; - } - - /** - * compare last update time of mtmvPartition and tablePartition - * - * @param mtmv - * @param mtmvPartitionId - * @param relatedTable - * @param relatedPartitionId - * @return - * @throws AnalysisException - */ - private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, - MTMVRelatedTableIf relatedTable, - Long relatedPartitionId) throws AnalysisException { - MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - return mtmv.getRefreshSnapshot() - .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot); - } - - /** - * like p_00000101_20170201 - * - * @param desc - * @return - */ - private static String generatePartitionName(PartitionKeyDesc desc) { - String partitionName = "p_"; - partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "") - .replaceAll("\\(|\\)|\\,|\\[|\\]", "_"); - if (partitionName.length() > 50) { - partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName)) - + "_" + System.currentTimeMillis(); - } - return partitionName; - } - - /** - * drop partition of mtmv - * - * @param mtmv - * @param partitionId - */ - private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { - if (!mtmv.writeLockIfExist()) { - return; - } - try { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); - DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); - Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); - } finally { - mtmv.writeUnlock(); - } - - } - - /** - * add partition for mtmv like relatedPartitionId of relatedTable - * - * @param mtmv - * @param partitionItem - * @throws DdlException - */ - private static void addPartition(MTMV mtmv, PartitionItem partitionItem) - throws DdlException { - PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc(); - Map<String, String> partitionProperties = Maps.newHashMap(); - SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, - generatePartitionName(oldPartitionKeyDesc), - oldPartitionKeyDesc, partitionProperties); - - AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, - mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); - Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); - } - - /** - * compare PartitionItem and return equals partitionId - * if not found, return -1L - * - * @param target - * @param sources - * @return - */ - private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) { - for (Entry<Long, PartitionItem> entry : sources.entrySet()) { - if (target.equals(entry.getValue())) { - return entry.getKey(); - } - } - return -1L; - } - - /** - * Determine is sync, ignoring excludedTriggerTables and non OlapTanle - * - * @param mtmvPartitionId - * @param tables - * @param excludedTriggerTables - * @return - */ - private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables, - Set<String> excludedTriggerTables) throws AnalysisException { - for (BaseTableInfo baseTableInfo : tables) { - TableIf table = null; - try { - table = getTable(baseTableInfo); - } catch (AnalysisException e) { - LOG.warn("get table failed, {}", baseTableInfo, e); - return false; - } - if (excludedTriggerTables.contains(table.getName())) { - continue; - } - boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo); - if (!syncWithBaseTable) { - return false; - } - } - return true; - } - - private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo) - throws AnalysisException { - TableIf table = null; - try { - table = getTable(baseTableInfo); - } catch (AnalysisException e) { - LOG.warn("get table failed, {}", baseTableInfo, e); - return false; - } - - if (!(table instanceof MTMVRelatedTableIf)) { - // if not MTMVRelatedTableIf, we can not get snapshot from it, - // Currently, it is believed to be synchronous - return true; - } - MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table; - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - return mtmv.getRefreshSnapshot() - .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); - } - - private static boolean mtmvContainsExternalTable(MTMV mtmv) { + public static boolean mtmvContainsExternalTable(MTMV mtmv) { Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables(); for (BaseTableInfo baseTableInfo : baseTables) { if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) { @@ -448,60 +66,4 @@ public class MTMVUtil { } return false; } - - public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, - Set<BaseTableInfo> baseTables, Set<Long> partitionIds) - throws AnalysisException { - Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); - for (Long partitionId : partitionIds) { - res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId)); - } - return res; - } - - - private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, - Set<BaseTableInfo> baseTables, Long partitionId) - throws AnalysisException { - MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions( - mtmv.getPartitionItems().get(partitionId), - relatedTable); - - for (Long relatedPartitionId : relatedPartitionIds) { - MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - refreshPartitionSnapshot.getPartitions() - .put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot); - } - } - for (BaseTableInfo baseTableInfo : baseTables) { - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv - .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - continue; - } - TableIf table = getTable(baseTableInfo); - if (!(table instanceof MTMVRelatedTableIf)) { - continue; - } - refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); - } - return refreshPartitionSnapshot; - } - - private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem, - MTMVRelatedTableIf relatedTable) { - List<Long> res = Lists.newArrayList(); - Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems(); - for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { - if (mtmvPartitionItem.equals(entry.getValue())) { - res.add(entry.getKey()); - // current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table - return res; - } - } - return res; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index bac4059c162..12f409a5946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -25,7 +25,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVRewriteUtil; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.memo.GroupExpression; @@ -315,8 +315,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac return ImmutableSet.of(); } // get mv valid partitions - Set<Long> mvDataValidPartitionIdSet = MTMVUtil.getMTMVCanRewritePartitions(mtmv, - cascadesContext.getConnectContext()).stream() + Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, + cascadesContext.getConnectContext(), System.currentTimeMillis()).stream() .map(Partition::getId) .collect(Collectors.toSet()); Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index e7b11b5fd17..baf72cc278e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -255,7 +255,7 @@ public class MaterializedViewUtils { Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get(); if (partitionColumnSet.contains(mvReferenceColumn)) { context.addTableColumn(table, mvReferenceColumn); - context.setPctPossible(true); + context.setPctPossible(!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull()); } return visit(relation, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java index f280e867819..5598c812594 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.util.Utils; @@ -67,7 +67,7 @@ public class RefreshMTMVInfo { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); if (!CollectionUtils.isEmpty(partitions)) { - MTMVUtil.getPartitionsIdsByNames(mtmv, partitions); + MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, partitions); } } catch (org.apache.doris.common.AnalysisException | MetaNotFoundException | DdlException e) { throw new AnalysisException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 21505586491..2e0de09d29b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -33,7 +33,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.qe.ConnectContext; @@ -634,7 +634,7 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString())); trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString())); trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString())); - trow.addToColumnValue(new TCell().setBoolVal(MTMVUtil.isMTMVSync(mv))); + trow.addToColumnValue(new TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv))); dataBatch.add(trow); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java new file mode 100644 index 00000000000..4bb74bfd448 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +public class MTMVPartitionUtilTest { + @Mocked + private MTMV mtmv; + @Mocked + private Partition p1; + @Mocked + private MTMVRelation relation; + @Mocked + private BaseTableInfo baseTableInfo; + @Mocked + private MTMVPartitionInfo mtmvPartitionInfo; + @Mocked + private OlapTable baseOlapTable; + @Mocked + private MTMVSnapshotIf baseSnapshotIf; + @Mocked + private MTMVRefreshSnapshot refreshSnapshot; + @Mocked + private MTMVUtil mtmvUtil; + + private Set<BaseTableInfo> baseTables = Sets.newHashSet(); + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + baseTables.add(baseTableInfo); + new Expectations() { + { + mtmv.getRelation(); + minTimes = 0; + result = relation; + + mtmv.getPartitions(); + minTimes = 0; + result = Lists.newArrayList(p1); + + p1.getId(); + minTimes = 0; + result = 1L; + + mtmv.getMvPartitionInfo(); + minTimes = 0; + result = mtmvPartitionInfo; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.SELF_MANAGE; + + mtmvUtil.getTable(baseTableInfo); + minTimes = 0; + result = baseOlapTable; + + baseOlapTable.needAutoRefresh(); + minTimes = 0; + result = true; + + baseOlapTable.getTableSnapshot(); + minTimes = 0; + result = baseSnapshotIf; + + mtmv.getPartitionName(anyLong); + minTimes = 0; + result = "p1"; + + mtmv.getRefreshSnapshot(); + minTimes = 0; + result = refreshSnapshot; + + refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + minTimes = 0; + result = true; + + relation.getBaseTables(); + minTimes = 0; + result = baseTables; + + baseOlapTable.needAutoRefresh(); + minTimes = 0; + result = true; + + baseOlapTable.getPartitionSnapshot(anyLong); + minTimes = 0; + result = baseSnapshotIf; + + baseOlapTable.getPartitionName(anyLong); + minTimes = 0; + result = "p1"; + + refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any); + minTimes = 0; + result = true; + } + }; + } + + @Test + public void testIsMTMVSyncNormal() { + boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv); + Assert.assertTrue(mtmvSync); + } + + @Test + public void testIsMTMVSyncNotSync() { + new Expectations() { + { + refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + minTimes = 0; + result = false; + } + }; + boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv); + Assert.assertFalse(mtmvSync); + } + + @Test + public void testIsSyncWithPartition() throws AnalysisException { + boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + Assert.assertTrue(isSyncWithPartition); + } + + @Test + public void testIsSyncWithPartitionNotSync() throws AnalysisException { + new Expectations() { + { + refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any); + minTimes = 0; + result = false; + } + }; + boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + Assert.assertFalse(isSyncWithPartition); + } + + @Test + public void testGeneratePartitionName() { + List<List<PartitionValue>> inValues = Lists.newArrayList(); + inValues.add(Lists.newArrayList(new PartitionValue("value11"), new PartitionValue("value12"))); + inValues.add(Lists.newArrayList(new PartitionValue("value21"), new PartitionValue("value22"))); + PartitionKeyDesc inDesc = PartitionKeyDesc.createIn(inValues); + String inName = MTMVPartitionUtil.generatePartitionName(inDesc); + Assert.assertEquals("p_value11_value12_value21_value22", inName); + + PartitionKeyDesc rangeDesc = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue(1L)), + Lists.newArrayList(new PartitionValue(2L)) + ); + String rangeName = MTMVPartitionUtil.generatePartitionName(rangeDesc); + Assert.assertEquals("p_1_2", rangeName); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java new file mode 100644 index 00000000000..42b5b783841 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class MTMVRefreshSnapshotTest { + private String mvExistPartitionName = "mvp1"; + private String relatedExistPartitionName = "p1"; + private long baseExistTableId = 1L; + private long correctVersion = 1L; + private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot(); + private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion); + private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion); + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = Maps.newHashMap(); + MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new MTMVRefreshPartitionSnapshot(); + partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot); + mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, p1Snapshot); + mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot); + refreshSnapshot.updateSnapshots(partitionSnapshots, Sets.newHashSet(mvExistPartitionName)); + } + + @Test + public void testPartitionSync() { + // normal + boolean sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertTrue(sync); + // non exist mv partition + sync = refreshSnapshot.equalsWithRelatedPartition("mvp2", relatedExistPartitionName, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // non exist related partition + sync = refreshSnapshot + .equalsWithRelatedPartition(mvExistPartitionName, "p2", new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // snapshot value not equal + sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVVersionSnapshot(2L)); + Assert.assertFalse(sync); + // snapshot type not equal + sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVTimestampSnapshot(correctVersion)); + Assert.assertFalse(sync); + } + + @Test + public void testTableSync() { + // normal + boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertTrue(sync); + // non exist mv partition + sync = refreshSnapshot + .equalsWithBaseTable("mvp2", baseExistTableId, new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // non exist related partition + sync = refreshSnapshot + .equalsWithBaseTable(mvExistPartitionName, 2L, new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // snapshot value not equal + sync = refreshSnapshot + .equalsWithBaseTable(mvExistPartitionName, baseExistTableId, new MTMVVersionSnapshot(2L)); + Assert.assertFalse(sync); + // snapshot type not equal + sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + new MTMVTimestampSnapshot(correctVersion)); + Assert.assertFalse(sync); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java new file mode 100644 index 00000000000..55394897e42 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; + +public class MTMVRewriteUtilTest { + @Mocked + private MTMV mtmv; + @Mocked + private ConnectContext ctx; + @Mocked + private SessionVariable sessionVariable; + @Mocked + private Partition p1; + @Mocked + private MTMVRelation relation; + @Mocked + private MTMVStatus status; + @Mocked + private MTMVPartitionUtil mtmvPartitionUtil; + @Mocked + private MTMVUtil mtmvUtil; + private long currentTimeMills = 3L; + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + + new Expectations() { + { + mtmv.getPartitions(); + minTimes = 0; + result = Lists.newArrayList(p1); + + p1.getVisibleVersionTime(); + minTimes = 0; + result = 1L; + + mtmv.getGracePeriod(); + minTimes = 0; + result = 0L; + + mtmv.getRelation(); + minTimes = 0; + result = relation; + + mtmv.getStatus(); + minTimes = 0; + result = status; + + mtmv.getGracePeriod(); + minTimes = 0; + result = 0L; + + status.getState(); + minTimes = 0; + result = MTMVState.NORMAL; + + status.getRefreshState(); + minTimes = 0; + result = MTMVRefreshState.SUCCESS; + + ctx.getSessionVariable(); + minTimes = 0; + result = sessionVariable; + + sessionVariable.isEnableMaterializedViewRewrite(); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = true; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = true; + + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = false; + } + }; + } + + @Test + public void testGetMTMVCanRewritePartitionsNormal() { + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsInGracePeriod() throws AnalysisException { + new Expectations() { + { + mtmv.getGracePeriod(); + minTimes = 0; + result = 2L; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = false; + } + }; + + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws AnalysisException { + new Expectations() { + { + mtmv.getGracePeriod(); + minTimes = 0; + result = 1L; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = false; + } + }; + + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() { + new Expectations() { + { + sessionVariable.isEnableMaterializedViewRewrite(); + minTimes = 0; + result = false; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { + new Expectations() { + { + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = false; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsEnableContainExternalTable() { + new Expectations() { + { + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = true; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsDisableContainExternalTable() { + new Expectations() { + { + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = false; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsStateAbnormal() { + new Expectations() { + { + status.getState(); + minTimes = 0; + result = MTMVState.SCHEMA_CHANGE; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsRefreshStateAbnormal() { + new Expectations() { + { + status.getRefreshState(); + minTimes = 0; + result = MTMVRefreshState.FAIL; + } + }; + Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java new file mode 100644 index 00000000000..b1fc52dad46 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; +import org.apache.doris.job.extensions.mtmv.MTMVTaskContext; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.apache.commons.collections.CollectionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +public class MTMVTaskTest { + private long poneId = 1L; + private String poneName = "p1"; + private long ptwoId = 2L; + private String ptwoName = "p2"; + private List<Long> allPartitionIds = Lists.newArrayList(poneId, ptwoId); + private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(), Sets.newHashSet()); + + @Mocked + private MTMV mtmv; + @Mocked + private MTMVUtil mtmvUtil; + @Mocked + private MTMVPartitionUtil mtmvPartitionUtil; + @Mocked + private MTMVPartitionInfo mtmvPartitionInfo; + @Mocked + private MTMVRefreshInfo mtmvRefreshInfo; + + @Before + public void setUp() + throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { + + new Expectations() { + { + mtmvUtil.getMTMV(anyLong, anyLong); + minTimes = 0; + result = mtmv; + + mtmv.getPartitionIds(); + minTimes = 0; + result = allPartitionIds; + + mtmv.getMvPartitionInfo(); + minTimes = 0; + result = mtmvPartitionInfo; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.FOLLOW_BASE_TABLE; + + mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, Lists.newArrayList(poneName)); + minTimes = 0; + result = poneId; + + mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = true; + + mtmv.getRefreshInfo(); + minTimes = 0; + result = mtmvRefreshInfo; + + mtmvRefreshInfo.getRefreshMethod(); + minTimes = 0; + result = RefreshMethod.COMPLETE; + } + }; + } + + @Test + public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List<Long> result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(allPartitionIds, result); + } + + @Test + public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List<Long> result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(Lists.newArrayList(poneId), result); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List<Long> result = task.calculateNeedRefreshPartitions(); + Assert.assertTrue(CollectionUtils.isEmpty(result)); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException { + new Expectations() { + { + mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = false; + } + }; + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List<Long> result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(allPartitionIds, result); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws AnalysisException { + new Expectations() { + { + mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + minTimes = 0; + result = false; + + mtmvRefreshInfo.getRefreshMethod(); + minTimes = 0; + result = RefreshMethod.AUTO; + + mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any); + minTimes = 0; + result = Lists.newArrayList(ptwoId); + } + }; + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List<Long> result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(Lists.newArrayList(ptwoId), result); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java index 4204bbe0221..02fb18edbf7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java @@ -26,7 +26,6 @@ import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Optional; @@ -252,7 +251,6 @@ public class MaterializedViewUtilsTest extends TestWithFeService { } @Test - @Disabled public void getRelatedTableInfoTestWithoutGroupNullTest() { PlanChecker.from(connectContext) .checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias, l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, " diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out b/regression-test/data/mtmv_p0/test_hive_mtmv.out index 9ee89dd033d..26e34af7b5d 100644 --- a/regression-test/data/mtmv_p0/test_hive_mtmv.out +++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out @@ -8,6 +8,11 @@ 1 A 20230101 2 B 20230101 3 C 20230101 + +-- !refresh_complete -- +1 A 20230101 +2 B 20230101 +3 C 20230101 4 D 20230102 5 E 20230102 6 F 20230102 diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy index 573f1f84d5d..cf34cfb616a 100644 --- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy @@ -57,12 +57,20 @@ suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive") order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by id" //refresh other partitions + // current, for hive, auto refresh will not change data sql """ REFRESH MATERIALIZED VIEW ${mvName} """ waitingMTMVTaskFinished(jobName) order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by id" + //refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_refresh_complete "SELECT * FROM ${mvName} order by id" + sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalog_name}""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org