This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 77270a04797 [enhance](mtmv)Improve the performance of obtaining partition/table v… (#39478) 77270a04797 is described below commit 77270a0479752f7595649ed4da583cf74bd0c98a Author: zhangdong <493738...@qq.com> AuthorDate: Thu Aug 22 00:07:52 2024 +0800 [enhance](mtmv)Improve the performance of obtaining partition/table v… (#39478) …ersions (#39301) pick: https://github.com/apache/doris/pull/39301 --- .../java/org/apache/doris/catalog/OlapTable.java | 13 +- .../java/org/apache/doris/catalog/Partition.java | 13 ++ .../doris/datasource/hive/HMSExternalTable.java | 6 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 18 ++- .../org/apache/doris/mtmv/MTMVBaseVersions.java | 38 ++++++ .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 142 +++++++++++++++------ .../org/apache/doris/mtmv/MTMVRefreshContext.java | 54 ++++++++ .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 4 +- .../org/apache/doris/mtmv/MTMVRewriteUtil.java | 12 +- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 27 +++- .../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 10 +- .../java/org/apache/doris/mtmv/MTMVTaskTest.java | 24 ++-- 12 files changed, 272 insertions(+), 89 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index db5c756772e..9bbb76f0b6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,6 +47,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVVersionSnapshot; @@ -2836,14 +2837,18 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { - long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion(); + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { + Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions(); + long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName) + : getPartitionOrAnalysisException(partitionName).getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion); } @Override - public MTMVSnapshotIf getTableSnapshot() { - long visibleVersion = getVisibleVersion(); + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions(); + long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 4a829c204ad..2d0e8079cb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -20,9 +20,11 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.MaterializedIndex.IndexState; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -37,6 +39,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Internal representation of partition-related metadata. @@ -168,6 +171,16 @@ public class Partition extends MetaObject implements Writable { return visibleVersionTime; } + public static List<Long> getVisibleVersions(List<? extends Partition> partitions) throws RpcException { + if (Config.isCloudMode()) { + // Throwing RPC exceptions is to ensure compatibility with the caller's code + // and avoid different implementations in different versions + throw new RpcException("127.0.0.1", "not implement cloud in current version"); + } else { + return partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList()); + } + } + /** * if visibleVersion is 1, do not return creation time but 0 * diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 2a1129e5ee5..6bb2be58743 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; @@ -749,13 +750,14 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { long partitionLastModifyTime = getPartitionLastModifyTime(partitionName); return new MTMVTimestampSnapshot(partitionLastModifyTime); } @Override - public MTMVSnapshotIf getTableSnapshot() throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index ae49759b54c..1fa42236c61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -37,6 +37,7 @@ import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRelation; @@ -177,8 +178,8 @@ public class MTMVTask extends AbstractTask { if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVPartitionUtil.alignMvPartition(mtmv); } - Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings(); - this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings); + MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); + this.needRefreshPartitions = calculateNeedRefreshPartitions(context); this.refreshMode = generateRefreshMode(needRefreshPartitions); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; @@ -196,8 +197,7 @@ public class MTMVTask extends AbstractTask { .subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end)); // need get names before exec Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil - .generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames, - partitionMappings); + .generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames); exec(ctx, execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); @@ -432,7 +432,7 @@ public class MTMVTask extends AbstractTask { } } - public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> partitionMappings) + public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context) throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { @@ -450,9 +450,8 @@ public class MTMVTask extends AbstractTask { // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(), - mtmv.getExcludedTriggerTables(), - partitionMappings); + boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevel(), + mtmv.getExcludedTriggerTables()); if (fresh) { return Lists.newArrayList(); } @@ -462,8 +461,7 @@ public class MTMVTask extends AbstractTask { } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(), - partitionMappings); + return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevel()); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java new file mode 100644 index 00000000000..7f83389a953 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import java.util.Map; + +public class MTMVBaseVersions { + private final Map<Long, Long> tableVersions; + private final Map<String, Long> partitionVersions; + + public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long> partitionVersions) { + this.tableVersions = tableVersions; + this.partitionVersions = partitionVersions; + } + + public Map<Long, Long> getTableVersions() { + return tableVersions; + } + + public Map<String, Long> getPartitionVersions() { + return partitionVersions; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 5625c695b47..54f8374d9a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -26,13 +26,16 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.rpc.RpcException; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -71,17 +74,18 @@ public class MTMVPartitionUtil { /** * Determine whether the partition is sync with retated partition and other baseTables * - * @param mtmv + * @param refreshContext * @param partitionName - * @param relatedPartitionNames * @param tables * @param excludedTriggerTables * @return * @throws AnalysisException */ - public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set<String> relatedPartitionNames, + public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, String partitionName, Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { + MTMV mtmv = refreshContext.getMtmv(); + Set<String> relatedPartitionNames = refreshContext.getPartitionMappings().get(partitionName); boolean isSyncWithPartition = true; if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); @@ -92,9 +96,10 @@ public class MTMVPartitionUtil { partitionName, mtmv.getName(), relatedTable.getName()); return false; } - isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, relatedTable, relatedPartitionNames); + isSyncWithPartition = isSyncWithPartitions(refreshContext, partitionName, relatedPartitionNames); } - return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionName, tables, excludedTriggerTables); + return isSyncWithPartition && isSyncWithAllBaseTables(refreshContext, partitionName, tables, + excludedTriggerTables); } @@ -192,8 +197,8 @@ public class MTMVPartitionUtil { return false; } try { - return isMTMVSync(mtmv, mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet(), - mtmv.calculatePartitionMappings()); + return isMTMVSync(MTMVRefreshContext.buildContext(mtmv), mtmvRelation.getBaseTablesOneLevel(), + Sets.newHashSet()); } catch (AnalysisException e) { LOG.warn("isMTMVSync failed: ", e); return false; @@ -203,19 +208,18 @@ public class MTMVPartitionUtil { /** * Determine whether the mtmv is sync with tables * - * @param mtmv + * @param context * @param tables * @param excludeTables - * @param partitionMappings * @return * @throws AnalysisException */ - public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, - Map<String, Set<String>> partitionMappings) + public static boolean isMTMVSync(MTMVRefreshContext context, Set<BaseTableInfo> tables, Set<String> excludeTables) throws AnalysisException { + MTMV mtmv = context.getMtmv(); Set<String> partitionNames = mtmv.getPartitionNames(); for (String partitionName : partitionNames) { - if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), tables, + if (!isMTMVPartitionSync(context, partitionName, tables, excludeTables)) { return false; } @@ -234,17 +238,18 @@ public class MTMVPartitionUtil { public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds) throws AnalysisException { Map<Long, List<String>> res = Maps.newHashMap(); - Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings(); + MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); for (Long partitionId : partitionIds) { String partitionName = mtmv.getPartitionOrAnalysisException(partitionId).getName(); - res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName, partitionMappings.get(partitionName))); + res.put(partitionId, getPartitionUnSyncTables(context, partitionName)); } return res; } - private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partitionName, - Set<String> relatedPartitionNames) + private static List<String> getPartitionUnSyncTables(MTMVRefreshContext context, String partitionName) throws AnalysisException { + MTMV mtmv = context.getMtmv(); + Set<String> relatedPartitionNames = context.getPartitionMappings().get(partitionName); List<String> res = Lists.newArrayList(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) { TableIf table = MTMVUtil.getTable(baseTableInfo); @@ -262,13 +267,13 @@ public class MTMVPartitionUtil { res.add(mtmvRelatedTableIf.getName()); continue; } - boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, mtmvRelatedTableIf, + boolean isSyncWithPartition = isSyncWithPartitions(context, partitionName, relatedPartitionNames); if (!isSyncWithPartition) { res.add(mtmvRelatedTableIf.getName()); } } else { - if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) { + if (!isSyncWithBaseTable(context, partitionName, baseTableInfo)) { res.add(table.getName()); } } @@ -279,17 +284,17 @@ public class MTMVPartitionUtil { /** * Get the partitions that need to be refreshed * - * @param mtmv + * @param context * @param baseTables * @return */ - public static List<String> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables, - Map<String, Set<String>> partitionMappings) { + public static List<String> getMTMVNeedRefreshPartitions(MTMVRefreshContext context, Set<BaseTableInfo> baseTables) { + MTMV mtmv = context.getMtmv(); Set<String> partitionNames = mtmv.getPartitionNames(); List<String> res = Lists.newArrayList(); for (String partitionName : partitionNames) { try { - if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), baseTables, + if (!isMTMVPartitionSync(context, partitionName, baseTables, mtmv.getExcludedTriggerTables())) { res.add(partitionName); } @@ -304,16 +309,16 @@ public class MTMVPartitionUtil { /** * Compare the current and last updated partition (or table) snapshot of the associated partition (or table) * - * @param mtmv + * @param context * @param mtmvPartitionName - * @param relatedTable * @param relatedPartitionNames * @return * @throws AnalysisException */ - public static boolean isSyncWithPartitions(MTMV mtmv, String mtmvPartitionName, - MTMVRelatedTableIf relatedTable, + public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mtmvPartitionName, Set<String> relatedPartitionNames) throws AnalysisException { + MTMV mtmv = context.getMtmv(); + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); if (!relatedTable.needAutoRefresh()) { return true; } @@ -324,7 +329,7 @@ public class MTMVPartitionUtil { } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName); + .getPartitionSnapshot(relatedPartitionName, context); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -397,7 +402,8 @@ public class MTMVPartitionUtil { * @param excludedTriggerTables * @return */ - private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionName, Set<BaseTableInfo> tables, + private static boolean isSyncWithAllBaseTables(MTMVRefreshContext context, String mtmvPartitionName, + Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { for (BaseTableInfo baseTableInfo : tables) { TableIf table = null; @@ -410,7 +416,7 @@ public class MTMVPartitionUtil { if (excludedTriggerTables.contains(table.getName())) { continue; } - boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionName, baseTableInfo); + boolean syncWithBaseTable = isSyncWithBaseTable(context, mtmvPartitionName, baseTableInfo); if (!syncWithBaseTable) { return false; } @@ -418,8 +424,10 @@ public class MTMVPartitionUtil { return true; } - private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, BaseTableInfo baseTableInfo) + private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mtmvPartitionName, + BaseTableInfo baseTableInfo) throws AnalysisException { + MTMV mtmv = context.getMtmv(); TableIf table = null; try { table = MTMVUtil.getTable(baseTableInfo); @@ -437,7 +445,7 @@ public class MTMVPartitionUtil { if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); } @@ -445,35 +453,35 @@ public class MTMVPartitionUtil { /** * Generate updated snapshots of partitions to determine if they are synchronized * - * @param mtmv + * @param context * @param baseTables * @param partitionNames - * @param partitionMappings * @return * @throws AnalysisException */ - public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, - Set<BaseTableInfo> baseTables, Set<String> partitionNames, - Map<String, Set<String>> partitionMappings) + public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMVRefreshContext context, + Set<BaseTableInfo> baseTables, Set<String> partitionNames) throws AnalysisException { Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); for (String partitionName : partitionNames) { res.put(partitionName, - generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionName))); + generatePartitionSnapshot(context, baseTables, + context.getPartitionMappings().get(partitionName))); } return res; } - private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, + private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefreshContext context, Set<BaseTableInfo> baseTables, Set<String> relatedPartitionNames) throws AnalysisException { + MTMV mtmv = context.getMtmv(); MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName); + .getPartitionSnapshot(relatedPartitionName, context); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -487,7 +495,8 @@ public class MTMVPartitionUtil { if (!(table instanceof MTMVRelatedTableIf)) { continue; } - refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); + refreshPartitionSnapshot.getTables() + .put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot(context)); } return refreshPartitionSnapshot; } @@ -501,4 +510,57 @@ public class MTMVPartitionUtil { } throw new AnalysisException("can not getPartitionColumnType by:" + col); } + + public static MTMVBaseVersions getBaseVersions(MTMV mtmv) throws AnalysisException { + return new MTMVBaseVersions(getTableVersions(mtmv), getPartitionVersions(mtmv)); + } + + private static Map<String, Long> getPartitionVersions(MTMV mtmv) throws AnalysisException { + Map<String, Long> res = Maps.newHashMap(); + if (mtmv.getMvPartitionInfo().getPartitionType().equals(MTMVPartitionType.SELF_MANAGE)) { + return res; + } + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + if (!(relatedTable instanceof OlapTable)) { + return res; + } + List<Partition> partitions = Lists.newArrayList(((OlapTable) relatedTable).getPartitions()); + List<Long> versions = null; + try { + versions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + throw new AnalysisException("getVisibleVersions failed.", e); + } + Preconditions.checkState(partitions.size() == versions.size()); + for (int i = 0; i < partitions.size(); i++) { + res.put(partitions.get(i).getName(), versions.get(i)); + } + return res; + } + + private static Map<Long, Long> getTableVersions(MTMV mtmv) { + Map<Long, Long> res = Maps.newHashMap(); + if (mtmv.getRelation() == null || mtmv.getRelation().getBaseTablesOneLevel() == null) { + return res; + } + List<OlapTable> olapTables = Lists.newArrayList(); + for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) { + TableIf table = null; + try { + table = MTMVUtil.getTable(baseTableInfo); + } catch (AnalysisException e) { + LOG.info(e); + continue; + } + if (table instanceof OlapTable) { + olapTables.add((OlapTable) table); + } + } + List<Long> versions = OlapTable.getVisibleVersionInBatch(olapTables); + Preconditions.checkState(olapTables.size() == versions.size()); + for (int i = 0; i < olapTables.size(); i++) { + res.put(olapTables.get(i).getId(), versions.get(i)); + } + return res; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java new file mode 100644 index 00000000000..3d611b5e852 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.common.AnalysisException; + +import java.util.Map; +import java.util.Set; + +public class MTMVRefreshContext { + private MTMV mtmv; + private Map<String, Set<String>> partitionMappings; + private MTMVBaseVersions baseVersions; + + public MTMVRefreshContext(MTMV mtmv) { + this.mtmv = mtmv; + } + + public MTMV getMtmv() { + return mtmv; + } + + public Map<String, Set<String>> getPartitionMappings() { + return partitionMappings; + } + + public MTMVBaseVersions getBaseVersions() { + return baseVersions; + } + + public static MTMVRefreshContext buildContext(MTMV mtmv) throws AnalysisException { + MTMVRefreshContext context = new MTMVRefreshContext(mtmv); + context.partitionMappings = mtmv.calculatePartitionMappings(); + context.baseVersions = MTMVPartitionUtil.getBaseVersions(mtmv); + return context; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index c34df750de5..516eb904e58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -69,7 +69,7 @@ public interface MTMVRelatedTableIf extends TableIf { * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; /** * getTableSnapshot @@ -77,7 +77,7 @@ public interface MTMVRelatedTableIf extends TableIf { * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot() throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java index 5392313ba62..5e17673a068 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -31,8 +31,6 @@ import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; public class MTMVRewriteUtil { private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class); @@ -57,7 +55,7 @@ public class MTMVRewriteUtil { || mtmv.getStatus().getRefreshState() == MTMVRefreshState.INIT) { return res; } - Map<String, Set<String>> partitionMappings = null; + MTMVRefreshContext refreshContext = null; // check gracePeriod long gracePeriodMills = mtmv.getGracePeriod(); for (Partition partition : allPartitions) { @@ -67,11 +65,11 @@ public class MTMVRewriteUtil { continue; } try { - if (partitionMappings == null) { - partitionMappings = mtmv.calculatePartitionMappings(); + if (refreshContext == null) { + refreshContext = MTMVRefreshContext.buildContext(mtmv); } - if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(), - partitionMappings.get(partition.getName()), mtmvRelation.getBaseTablesOneLevel(), + if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(), + mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet())) { res.add(partition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index d6c4a87f224..63a75c72498 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mocked; @@ -55,6 +56,10 @@ public class MTMVPartitionUtilTest { private MTMVRefreshSnapshot refreshSnapshot; @Mocked private MTMVUtil mtmvUtil; + @Mocked + private MTMVRefreshContext context; + @Mocked + private MTMVBaseVersions versions; private Set<BaseTableInfo> baseTables = Sets.newHashSet(); @@ -67,6 +72,18 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = relation; + context.getMtmv(); + minTimes = 0; + result = mtmv; + + context.getPartitionMappings(); + minTimes = 0; + result = Maps.newHashMap(); + + context.getBaseVersions(); + minTimes = 0; + result = versions; + mtmv.getPartitions(); minTimes = 0; result = Lists.newArrayList(p1); @@ -95,7 +112,7 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = true; - baseOlapTable.getTableSnapshot(); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); minTimes = 0; result = baseSnapshotIf; @@ -115,7 +132,7 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); minTimes = 0; result = baseSnapshotIf; @@ -152,7 +169,7 @@ public class MTMVPartitionUtilTest { @Test public void testIsSyncWithPartition() throws AnalysisException { boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertTrue(isSyncWithPartition); } @@ -166,7 +183,7 @@ public class MTMVPartitionUtilTest { } }; boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertFalse(isSyncWithPartition); } @@ -180,7 +197,7 @@ public class MTMVPartitionUtilTest { } }; boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertFalse(isSyncWithPartition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java index 2b8c16509af..c2e402adb82 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -103,7 +103,7 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = true; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; @@ -124,7 +124,7 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; @@ -154,7 +154,7 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; @@ -175,7 +175,7 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 1L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; @@ -208,7 +208,7 @@ public class MTMVRewriteUtilTest { public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { new Expectations() { { - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java index 512bd6099f0..0bcd2f05690 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -28,7 +28,6 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mocked; @@ -38,7 +37,6 @@ import org.junit.Before; import org.junit.Test; import java.util.List; -import java.util.Map; import java.util.Set; public class MTMVTaskTest { @@ -84,8 +82,7 @@ public class MTMVTaskTest { // minTimes = 0; // result = poneId; - mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, - (Map<String, Set<String>>) any); + mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = true; @@ -104,7 +101,7 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -112,7 +109,7 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(Lists.newArrayList(poneName), result); } @@ -127,7 +124,7 @@ public class MTMVTaskTest { }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertTrue(CollectionUtils.isEmpty(result)); } @@ -135,7 +132,7 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystemComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -143,15 +140,14 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException { new Expectations() { { - mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, - (Map<String, Set<String>>) any); + mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = false; } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -160,7 +156,7 @@ public class MTMVTaskTest { new Expectations() { { mtmvPartitionUtil - .isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, (Map<String, Set<String>>) any); + .isMTMVSync((MTMVRefreshContext) any, (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = false; @@ -169,14 +165,14 @@ public class MTMVTaskTest { result = RefreshMethod.AUTO; mtmvPartitionUtil - .getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any, (Map<String, Set<String>>) any); + .getMTMVNeedRefreshPartitions((MTMVRefreshContext) any, (Set<BaseTableInfo>) any); minTimes = 0; result = Lists.newArrayList(ptwoName); } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(Lists.newArrayList(ptwoName), result); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org