This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ca0c1a59b62 [enhance](mtmv)MTMV supports Hive multi-level partitioning (#31060) ca0c1a59b62 is described below commit ca0c1a59b629b83e0316f83213d5e3c5e3e2204b Author: zhangdong <493738...@qq.com> AuthorDate: Sun Feb 25 17:47:19 2024 +0800 [enhance](mtmv)MTMV supports Hive multi-level partitioning (#31060) Issue Number: close #xxx For example, the hive table is partitioned by `date` and `region`, with the following 6 partitions ``` 20200101 beijing shanghai 20200102 beijing shanghai 20200103 beijing shanghai ``` If the MTMV is partitioned by `date`, then the MTMV will have three partitions: 20200101, 202000102, 20200103 If the MTMV is partitioned by `region`, then the MTMV will have two partitions: beijing, shanghai --- .../Create/CREATE-ASYNC-MATERIALIZED-VIEW.md | 2 +- .../Create/CREATE-ASYNC-MATERIALIZED-VIEW.md | 2 +- .../apache/doris/analysis/PartitionKeyDesc.java | 23 ++ .../org/apache/doris/analysis/PartitionValue.java | 20 ++ .../apache/doris/catalog/ListPartitionItem.java | 19 ++ .../main/java/org/apache/doris/catalog/MTMV.java | 68 ++++++ .../org/apache/doris/catalog/PartitionItem.java | 10 + .../apache/doris/catalog/RangePartitionItem.java | 7 + .../doris/common/proc/PartitionsProcDir.java | 20 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 15 +- .../org/apache/doris/mtmv/MTMVPartitionInfo.java | 13 ++ .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 198 ++++++++-------- .../org/apache/doris/mtmv/MTMVRewriteUtil.java | 9 +- .../trees/plans/commands/info/CreateMTMVInfo.java | 8 +- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 6 +- .../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 12 +- .../java/org/apache/doris/mtmv/MTMVTaskTest.java | 26 ++- .../mtmv_p0/test_hive_multi_partition_mtmv.out | 43 ++++ .../mtmv_p0/test_hive_multi_partition_mtmv.groovy | 253 +++++++++++++++++++++ .../mtmv_p0/test_partition_refresh_mtmv.groovy | 90 ++++++-- 20 files changed, 698 insertions(+), 146 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md index 37156bc7011..91263a6ff51 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md @@ -160,7 +160,7 @@ KEY(k1,k2) ``` ##### partition -There are two types of partitioning methods for materialized views. If no partitioning is specified, there will be a default single partition. If a partitioning field is specified, the system will automatically deduce the source base table of that field and synchronize all partitions of the base table (currently supporting `OlapTable` and `hive`). (Limitation: the current base table can only have one partitioning field.) +There are two types of partitioning methods for materialized views. If no partitioning is specified, there will be a default single partition. If a partitioning field is specified, the system will automatically deduce the source base table of that field and synchronize all partitions of the base table (currently supporting `OlapTable` and `hive`). (Limitation: If the base table is an `OlapTable`, it can only have one partition field) For example, if the base table is a range partition with a partition field of `create_time` and partitioning by day, and `partition by(ct) as select create_time as ct from t1` is specified when creating a materialized view, then the materialized view will also be a range partition with a partition field of 'ct' and partitioning by day diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md index 8add74079bf..52050763c21 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-ASYNC-MATERIALIZED-VIEW.md @@ -160,7 +160,7 @@ KEY(k1,k2) ``` ##### partition -物化视图有两种分区方式,如果不指定分区,默认只有一个分区,如果指定分区字段,会自动推导出字段来自哪个基表并同步基表(当前支持`OlapTable`和`hive`)的所有分区(限制条件:当前基表只能有一个分区字段) +物化视图有两种分区方式,如果不指定分区,默认只有一个分区,如果指定分区字段,会自动推导出字段来自哪个基表并同步基表(当前支持`OlapTable`和`hive`)的所有分区(限制条件:基表如果是`OlapTable`,那么只能有一个分区字段) 例如:基表是range分区,分区字段为`create_time`并按天分区,创建物化视图时指定`partition by(ct) as select create_time as ct from t1` 那么物化视图也会是range分区,分区字段为`ct`,并且按天分区 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java index f7f63490ee8..4cd87953572 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionKeyDesc.java @@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Objects; import com.google.common.collect.Lists; import java.util.List; @@ -223,4 +224,26 @@ public class PartitionKeyDesc { })).append(")"); return sb.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionKeyDesc that = (PartitionKeyDesc) o; + return Objects.equal(lowerValues, that.lowerValues) + && Objects.equal(upperValues, that.upperValues) + && Objects.equal(inValues, that.inValues) + && partitionKeyValueType == that.partitionKeyValueType + && Objects.equal(timeInterval, that.timeInterval) + && Objects.equal(timeType, that.timeType); + } + + @Override + public int hashCode() { + return Objects.hashCode(lowerValues, upperValues, inValues, partitionKeyValueType, timeInterval, timeType); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java index 6d19c52c800..5dea5c767b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionValue.java @@ -20,6 +20,8 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import com.google.common.base.Objects; + public class PartitionValue { public static final PartitionValue MAX_VALUE = new PartitionValue(); @@ -69,4 +71,22 @@ public class PartitionValue { public boolean isHiveDefaultPartition() { return isHiveDefaultPartition; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionValue that = (PartitionValue) o; + return isHiveDefaultPartition == that.isHiveDefaultPartition + && Objects.equal(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hashCode(value, isHiveDefaultPartition); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index ef23a444965..47302c21d1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -19,14 +19,17 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.common.AnalysisException; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class ListPartitionItem extends PartitionItem { @@ -80,6 +83,22 @@ public class ListPartitionItem extends PartitionItem { return PartitionKeyDesc.createIn(inValues); } + @Override + public PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException { + List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue) + .collect(Collectors.toList()); + Set<List<PartitionValue>> res = Sets.newHashSet(); + for (List<PartitionValue> values : inValues) { + if (values.size() <= pos) { + throw new AnalysisException( + String.format("toPartitionKeyDesc IndexOutOfBounds, values: %s, pos: %d", values.toString(), + pos)); + } + res.add(Lists.newArrayList(values.get(pos))); + } + return PartitionKeyDesc.createIn(Lists.newArrayList(res)); + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(partitionKeys.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index e7b0a79dd53..10739be8538 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.catalog.OlapTableFactory.MTMVParams; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; @@ -28,6 +29,7 @@ import org.apache.doris.mtmv.MTMVCache; import org.apache.doris.mtmv.MTMVJobInfo; import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; @@ -38,6 +40,7 @@ import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -47,7 +50,9 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -243,6 +248,69 @@ public class MTMV extends OlapTable { return refreshSnapshot; } + /** + * generateMvPartitionDescs + * + * @return mvPartitionId ==> mvPartitionKeyDesc + */ + public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() { + Map<Long, PartitionItem> mtmvItems = getPartitionItems(); + Map<Long, PartitionKeyDesc> result = Maps.newHashMap(); + for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { + result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); + } + return result; + } + + /** + * generateRelatedPartitionDescs + * <p> + * Different partitions may generate the same PartitionKeyDesc through logical calculations + * (such as selecting only one column, or rolling up partitions), so it is a one to many relationship + * + * @return related PartitionKeyDesc ==> relatedPartitionIds + * @throws AnalysisException + */ + public Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs() throws AnalysisException { + if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { + return Maps.newHashMap(); + } + Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>(); + Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getPartitionItems(); + int relatedColPos = mvPartitionInfo.getRelatedColPos(); + for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { + PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); + if (res.containsKey(partitionKeyDesc)) { + res.get(partitionKeyDesc).add(entry.getKey()); + } else { + res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey())); + } + } + return res; + } + + /** + * Calculate the partition and associated partition mapping relationship of the MTMV + * It is the result of real-time comparison calculation, so there may be some costs, + * so it should be called with caution + * + * @return mvPartitionId ==> relationPartitionIds + * @throws AnalysisException + */ + public Map<Long, Set<Long>> calculatePartitionMappings() throws AnalysisException { + if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { + return Maps.newHashMap(); + } + Map<Long, Set<Long>> res = Maps.newHashMap(); + Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = generateRelatedPartitionDescs(); + Map<Long, PartitionItem> mvPartitionItems = getPartitionInfo().getIdToItem(false); + for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) { + res.put(entry.getKey(), + relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); + } + return res; + } + public void readMvLock() { this.mvRwLock.readLock().lock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java index 8ea754abff4..3c29aa2f48b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Writable; import java.util.Comparator; @@ -36,4 +37,13 @@ public abstract class PartitionItem implements Comparable<PartitionItem>, Writab } public abstract PartitionKeyDesc toPartitionKeyDesc(); + + /** + * Generate PartitionKeyDesc using only the posth PartitionValue + * + * @param pos + * @return + * @throws AnalysisException + */ + public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index cadb95ed3d9..7a9162c874c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -53,6 +53,13 @@ public class RangePartitionItem extends PartitionItem { PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); } + @Override + public PartitionKeyDesc toPartitionKeyDesc(int pos) { + // MTMV do not allow base tables with partition type range to have multiple partition columns, + // so pos is ignored here + return toPartitionKeyDesc(); + } + @Override public void write(DataOutput out) throws IOException { RangeUtils.writeRange(out, partitionKeyRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 4703429fa18..6b8f59e6508 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -220,7 +220,7 @@ public class PartitionsProcDir implements ProcDirInterface { return result; } - private List<List<Comparable>> getPartitionInfos() { + private List<List<Comparable>> getPartitionInfos() throws AnalysisException { Preconditions.checkNotNull(db); Preconditions.checkNotNull(olapTable); Preconditions.checkState(olapTable.isManagedTable()); @@ -244,6 +244,12 @@ public class PartitionsProcDir implements ProcDirInterface { } Joiner joiner = Joiner.on(", "); + Map<Long, List<String>> partitionsUnSyncTables = null; + if (olapTable instanceof MTMV) { + partitionsUnSyncTables = MTMVPartitionUtil + .getPartitionsUnSyncTables((MTMV) olapTable, partitionIds); + + } for (Long partitionId : partitionIds) { Partition partition = olapTable.getPartition(partitionId); @@ -308,15 +314,9 @@ public class PartitionsProcDir implements ProcDirInterface { partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); if (olapTable instanceof MTMV) { - try { - List<String> partitionUnSyncTables = MTMVPartitionUtil - .getPartitionUnSyncTables((MTMV) olapTable, partitionId); - partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); - partitionInfo.add(partitionUnSyncTables.toString()); - } catch (AnalysisException e) { - partitionInfo.add(false); - partitionInfo.add(e.getMessage()); - } + List<String> partitionUnSyncTables = partitionsUnSyncTables.get(partitionId); + partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); + partitionInfo.add(partitionUnSyncTables.toString()); } else { partitionInfo.add(true); partitionInfo.add(FeConstants.null_string); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index a859a8ca1b7..fa0a88fa93d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -169,9 +169,10 @@ public class MTMVTask extends AbstractTask { // To be completely consistent with hive, you need to manually refresh the cache // refreshHmsTable(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVPartitionUtil.alignMvPartition(mtmv); } - List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(); + Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings(); + List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(partitionMappings); this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { @@ -191,7 +192,8 @@ public class MTMVTask extends AbstractTask { // need get names before exec List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds); Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil - .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds); + .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds, + partitionMappings); exec(ctx, execPartitionIds, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); @@ -389,7 +391,7 @@ public class MTMVTask extends AbstractTask { } } - public List<Long> calculateNeedRefreshPartitions() throws AnalysisException { + public List<Long> calculateNeedRefreshPartitions(Map<Long, Set<Long>> partitionMappings) throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { if (taskContext.isComplete()) { @@ -402,7 +404,8 @@ public class MTMVTask extends AbstractTask { // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); + boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(), + partitionMappings); if (fresh) { return Lists.newArrayList(); } @@ -416,7 +419,7 @@ public class MTMVTask extends AbstractTask { } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables()); + return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables(), partitionMappings); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index c48594847f3..3a364e0749d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -89,6 +89,19 @@ public class MTMVPartitionInfo { this.partitionCol = partitionCol; } + /** + * Get the position of relatedCol in the relatedTable partition column + * + * @return + * @throws AnalysisException + */ + public int getRelatedColPos() throws AnalysisException { + if (partitionType == MTMVPartitionType.SELF_MANAGE) { + throw new AnalysisException("partitionType is: " + partitionType); + } + return MTMVPartitionUtil.getPos(getRelatedTable(), relatedCol); + } + @Override public String toString() { return "MTMVPartitionInfo{" diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 6657b3d243f..88fe02a8b4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.AllPartitionDesc; import org.apache.doris.analysis.DropPartitionClause; import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.analysis.SinglePartitionDesc; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; @@ -35,11 +36,13 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -58,27 +61,26 @@ public class MTMVPartitionUtil { * * @param mtmv * @param partitionId + * @param relatedPartitionIds * @param tables * @param excludedTriggerTables * @return * @throws AnalysisException */ - public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, + public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds, + Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { boolean isSyncWithPartition = true; if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); // if follow base table, not need compare with related table, only should compare with related partition excludedTriggerTables.add(relatedTable.getName()); - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { - LOG.warn("can not found related partition: " + partitionId); + if (CollectionUtils.isEmpty(relatedPartitionIds)) { + LOG.warn("can not found related partition, partitionId: {}, mtmvName: {}, relatedTableName: {}", + partitionId, mtmv.getName(), relatedTable.getName()); return false; } - isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); + isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, relatedTable, relatedPartitionIds); } return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); @@ -88,26 +90,24 @@ public class MTMVPartitionUtil { * Align the partitions of mtmv and related tables, delete more and add less * * @param mtmv - * @param relatedTable * @throws DdlException * @throws AnalysisException */ - public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable) + public static void alignMvPartition(MTMV mtmv) throws DdlException, AnalysisException { - Map<Long, PartitionItem> relatedTableItems = Maps.newHashMap(relatedTable.getPartitionItems()); - Map<Long, PartitionItem> mtmvItems = Maps.newHashMap(mtmv.getPartitionItems()); + Map<Long, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs(); + Set<PartitionKeyDesc> relatedPartitionDescs = mtmv.generateRelatedPartitionDescs().keySet(); // drop partition of mtmv - for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); - if (partitionId == -1L) { + for (Entry<Long, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) { + if (!relatedPartitionDescs.contains(entry.getValue())) { dropPartition(mtmv, entry.getKey()); } } // add partition for mtmv - for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), mtmvItems); - if (partitionId == -1L) { - addPartition(mtmv, entry.getValue()); + HashSet<PartitionKeyDesc> mtmvPartitionDescsSet = Sets.newHashSet(mtmvPartitionDescs.values()); + for (PartitionKeyDesc desc : relatedPartitionDescs) { + if (!mtmvPartitionDescsSet.contains(desc)) { + addPartition(mtmv, desc); } } } @@ -117,19 +117,19 @@ public class MTMVPartitionUtil { * * @param relatedTable * @param tableProperties + * @param relatedCol * @return * @throws AnalysisException */ public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable, - Map<String, String> tableProperties) throws AnalysisException { + Map<String, String> tableProperties, String relatedCol) throws AnalysisException { HashMap<String, String> partitionProperties = Maps.newHashMap(); List<AllPartitionDesc> res = Lists.newArrayList(); - Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems(); - for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) { - PartitionKeyDesc oldPartitionKeyDesc = entry.getValue().toPartitionKeyDesc(); + Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol); + for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) { SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, - generatePartitionName(oldPartitionKeyDesc), - oldPartitionKeyDesc, partitionProperties); + generatePartitionName(partitionKeyDesc), + partitionKeyDesc, partitionProperties); // mtmv can only has one partition col singlePartitionDesc.analyze(1, tableProperties); res.add(singlePartitionDesc); @@ -137,6 +137,29 @@ public class MTMVPartitionUtil { return res; } + private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol) + throws AnalysisException { + int pos = getPos(relatedTable, relatedCol); + Set<PartitionKeyDesc> res = Sets.newHashSet(); + for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItems().entrySet()) { + PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos); + res.add(partitionKeyDesc); + } + return res; + } + + public static int getPos(MTMVRelatedTableIf relatedTable, String relatedCol) throws AnalysisException { + List<Column> partitionColumns = relatedTable.getPartitionColumns(); + for (int i = 0; i < partitionColumns.size(); i++) { + if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { + return i; + } + } + throw new AnalysisException( + String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol, + partitionColumns)); + } + public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException { List<String> res = Lists.newArrayList(); for (Long partitionId : ids) { @@ -166,7 +189,7 @@ public class MTMVPartitionUtil { return false; } try { - return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet()); + return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), mtmv.calculatePartitionMappings()); } catch (AnalysisException e) { LOG.warn("isMTMVSync failed: ", e); return false; @@ -179,14 +202,17 @@ public class MTMVPartitionUtil { * @param mtmv * @param tables * @param excludeTables + * @param partitionMappings * @return * @throws AnalysisException */ - public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables) + public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, + Map<Long, Set<Long>> partitionMappings) throws AnalysisException { Collection<Partition> partitions = mtmv.getPartitions(); for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) { + if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), tables, + excludeTables)) { return false; } } @@ -194,14 +220,25 @@ public class MTMVPartitionUtil { } /** - * get not sync tables + * getPartitionsUnSyncTables * * @param mtmv - * @param partitionId - * @return + * @param partitionIds + * @return partitionId ==> UnSyncTableNames * @throws AnalysisException */ - public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { + public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds) + throws AnalysisException { + Map<Long, List<String>> res = Maps.newHashMap(); + Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings(); + for (Long partitionId : partitionIds) { + res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionId, partitionMappings.get(partitionId))); + } + return res; + } + + private static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds) + throws AnalysisException { List<String> res = Lists.newArrayList(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { TableIf table = MTMVUtil.getTable(baseTableInfo); @@ -214,15 +251,11 @@ public class MTMVPartitionUtil { } if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { + if (CollectionUtils.isEmpty(relatedPartitionIds)) { throw new AnalysisException("can not found related partition"); } - boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf, - relatedPartitionId); + boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, mtmvRelatedTableIf, + relatedPartitionIds); if (!isSyncWithPartition) { res.add(mtmvRelatedTableIf.getName()); } @@ -242,12 +275,13 @@ public class MTMVPartitionUtil { * @param baseTables * @return */ - public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) { + public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables, + Map<Long, Set<Long>> partitionMappings) { Collection<Partition> allPartitions = mtmv.getPartitions(); List<Long> res = Lists.newArrayList(); for (Partition partition : allPartitions) { try { - if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables, + if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), baseTables, mtmv.getExcludedTriggerTables())) { res.add(partition.getId()); } @@ -260,27 +294,33 @@ public class MTMVPartitionUtil { } /** - * compare last update time of mtmvPartition and tablePartition + * Compare the current and last updated partition (or table) snapshot of the associated partition (or table) * * @param mtmv * @param mtmvPartitionId * @param relatedTable - * @param relatedPartitionId + * @param relatedPartitionIds * @return * @throws AnalysisException */ - public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, + public static boolean isSyncWithPartitions(MTMV mtmv, Long mtmvPartitionId, MTMVRelatedTableIf relatedTable, - Long relatedPartitionId) throws AnalysisException { + Set<Long> relatedPartitionIds) throws AnalysisException { if (!relatedTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - return mtmv.getRefreshSnapshot() - .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot); + for (Long relatedPartitionId : relatedPartitionIds) { + MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable + .getPartitionSnapshot(relatedPartitionId); + String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); + if (!mtmv.getRefreshSnapshot() + .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, + relatedPartitionCurrentSnapshot)) { + return false; + } + } + return true; } /** @@ -323,12 +363,11 @@ public class MTMVPartitionUtil { * add partition for mtmv like relatedPartitionId of relatedTable * * @param mtmv - * @param partitionItem + * @param oldPartitionKeyDesc * @throws DdlException */ - private static void addPartition(MTMV mtmv, PartitionItem partitionItem) + private static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc) throws DdlException { - PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc(); Map<String, String> partitionProperties = Maps.newHashMap(); SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, generatePartitionName(oldPartitionKeyDesc), @@ -339,23 +378,6 @@ public class MTMVPartitionUtil { Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); } - /** - * compare PartitionItem and return equals partitionId - * if not found, return -1L - * - * @param target - * @param sources - * @return - */ - private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) { - for (Entry<Long, PartitionItem> entry : sources.entrySet()) { - if (target.equals(entry.getValue())) { - return entry.getKey(); - } - } - return -1L; - } - /** * Determine is sync, ignoring excludedTriggerTables and non OlapTanle * @@ -410,27 +432,35 @@ public class MTMVPartitionUtil { .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); } + /** + * Generate updated snapshots of partitions to determine if they are synchronized + * + * @param mtmv + * @param baseTables + * @param partitionIds + * @param partitionMappings + * @return + * @throws AnalysisException + */ public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, - Set<BaseTableInfo> baseTables, Set<Long> partitionIds) + Set<BaseTableInfo> baseTables, Set<Long> partitionIds, + Map<Long, Set<Long>> partitionMappings) throws AnalysisException { Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); for (Long partitionId : partitionIds) { - res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId)); + res.put(mtmv.getPartition(partitionId).getName(), + generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId))); } return res; } private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, - Set<BaseTableInfo> baseTables, Long partitionId) + Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds) throws AnalysisException { MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions( - mtmv.getPartitionItems().get(partitionId), - relatedTable); - for (Long relatedPartitionId : relatedPartitionIds) { MTMVSnapshotIf partitionSnapshot = relatedTable .getPartitionSnapshot(relatedPartitionId); @@ -451,18 +481,4 @@ public class MTMVPartitionUtil { } return refreshPartitionSnapshot; } - - private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem, - MTMVRelatedTableIf relatedTable) { - List<Long> res = Lists.newArrayList(); - Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems(); - for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { - if (mtmvPartitionItem.equals(entry.getValue())) { - res.add(entry.getKey()); - // current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table - return res; - } - } - return res; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java index 666a79eba97..f0199169859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -31,6 +31,8 @@ import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; public class MTMVRewriteUtil { private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class); @@ -64,6 +66,7 @@ public class MTMVRewriteUtil { && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { return res; } + Map<Long, Set<Long>> partitionMappings = null; // check gracePeriod long gracePeriodMills = mtmv.getGracePeriod(); for (Partition partition : allPartitions) { @@ -73,7 +76,11 @@ public class MTMVRewriteUtil { continue; } try { - if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), + if (partitionMappings == null) { + partitionMappings = mtmv.calculatePartitionMappings(); + } + if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), + partitionMappings.get(partition.getId()), mtmvRelation.getBaseTables(), Sets.newHashSet())) { res.add(partition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index 2b2fe8ab9f2..90c98a44294 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mtmv.EnvInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; @@ -304,8 +305,9 @@ public class CreateMTMVInfo { if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); } - if (partitionColumnNames.size() != 1) { - throw new AnalysisException("base table for partitioning only support single column."); + if (!(mtmvBaseRealtedTable instanceof HMSExternalTable) + && partitionColumnNames.size() != 1) { + throw new AnalysisException("only hms table support multi column partition."); } mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo()); mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn()); @@ -322,7 +324,7 @@ public class CreateMTMVInfo { List<AllPartitionDesc> allPartitionDescs = null; try { allPartitionDescs = MTMVPartitionUtil - .getPartitionDescsByRelatedTable(relatedTable, properties); + .getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol()); } catch (org.apache.doris.common.AnalysisException e) { throw new AnalysisException("getPartitionDescsByRelatedTable failed", e); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 62f2fd5ff58..df38ce18720 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -151,7 +151,8 @@ public class MTMVPartitionUtilTest { @Test public void testIsSyncWithPartition() throws AnalysisException { - boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + boolean isSyncWithPartition = MTMVPartitionUtil + .isSyncWithPartitions(mtmv, 1L, baseOlapTable, Sets.newHashSet(2L)); Assert.assertTrue(isSyncWithPartition); } @@ -164,7 +165,8 @@ public class MTMVPartitionUtilTest { result = false; } }; - boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + boolean isSyncWithPartition = MTMVPartitionUtil + .isSyncWithPartitions(mtmv, 1L, baseOlapTable, Sets.newHashSet(2L)); Assert.assertFalse(isSyncWithPartition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java index 55394897e42..8de7ed75ccd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -103,7 +103,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = true; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + (Set<String>) any); minTimes = 0; result = true; @@ -129,7 +130,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + (Set<String>) any); minTimes = 0; result = false; } @@ -148,7 +150,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 1L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + (Set<String>) any); minTimes = 0; result = false; } @@ -177,7 +180,8 @@ public class MTMVRewriteUtilTest { public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { new Expectations() { { - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any); + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + (Set<String>) any); minTimes = 0; result = false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java index b1fc52dad46..1d1c952bb3c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -28,6 +28,7 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mocked; @@ -37,6 +38,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; import java.util.Set; public class MTMVTaskTest { @@ -84,7 +86,8 @@ public class MTMVTaskTest { minTimes = 0; result = poneId; - mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, + (Map<Long, Set<Long>>) any); minTimes = 0; result = true; @@ -101,9 +104,9 @@ public class MTMVTaskTest { @Test public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { - MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true); + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(); + List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertEquals(allPartitionIds, result); } @@ -111,7 +114,7 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(); + List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertEquals(Lists.newArrayList(poneId), result); } @@ -119,7 +122,7 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(); + List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertTrue(CollectionUtils.isEmpty(result)); } @@ -127,14 +130,15 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException { new Expectations() { { - mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, + (Map<Long, Set<Long>>) any); minTimes = 0; result = false; } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(); + List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertEquals(allPartitionIds, result); } @@ -142,7 +146,8 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws AnalysisException { new Expectations() { { - mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any); + mtmvPartitionUtil + .isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, (Map<Long, Set<Long>>) any); minTimes = 0; result = false; @@ -150,14 +155,15 @@ public class MTMVTaskTest { minTimes = 0; result = RefreshMethod.AUTO; - mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any); + mtmvPartitionUtil + .getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any, (Map<Long, Set<Long>>) any); minTimes = 0; result = Lists.newArrayList(ptwoId); } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(); + List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertEquals(Lists.newArrayList(ptwoId), result); } } diff --git a/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out b/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out new file mode 100644 index 00000000000..6f00353bf59 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_hive_multi_partition_mtmv.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base_table -- +1 2020 bj +2 2020 sh +3 2021 bj +4 2021 sh +5 2022 bj +6 2022 sh + +-- !mtmv_year_2020 -- +1 2020 bj +2 2020 sh + +-- !mtmv_year_complete -- +1 2020 bj +2 2020 sh +3 2021 bj +4 2021 sh +5 2022 bj +6 2022 sh + +-- !mtmv_region_bj -- +1 2020 bj +3 2021 bj +5 2022 bj + +-- !mtmv_region_complete -- +1 2020 bj +2 2020 sh +3 2021 bj +4 2021 sh +5 2022 bj +6 2022 sh + +-- !mtmv_data_change -- +1 2020 bj +2 2020 sh +3 2021 bj +4 2021 sh +5 2022 bj +6 2022 sh +7 2020 bj + diff --git a/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy new file mode 100644 index 00000000000..c8e5830dcac --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_hive_multi_partition_mtmv.groovy @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_multi_partition_mtmv", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + // prepare data in hive + def hive_database = "test_hive_multi_partition_mtmv_db" + def hive_table = "partition2" + + def drop_table_str = """ drop table if exists ${hive_database}.${hive_table} """ + def drop_database_str = """ drop database if exists ${hive_database}""" + def create_database_str = """ create database ${hive_database}""" + def create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} ( + `k1` int) + PARTITIONED BY ( + `year` int, + `region` string) + STORED AS ORC; + """ + def add_partition_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(year=2020,region="bj") + partition(year=2020,region="sh") + partition(year=2021,region="bj") + partition(year=2021,region="sh") + partition(year=2022,region="bj") + partition(year=2022,region="sh") + """ + def insert_str1 = """insert into ${hive_database}.${hive_table} PARTITION(year=2020,region="bj") values(1)""" + def insert_str2 = """insert into ${hive_database}.${hive_table} PARTITION(year=2020,region="sh") values(2)""" + def insert_str3 = """insert into ${hive_database}.${hive_table} PARTITION(year=2021,region="bj") values(3)""" + def insert_str4 = """insert into ${hive_database}.${hive_table} PARTITION(year=2021,region="sh") values(4)""" + def insert_str5 = """insert into ${hive_database}.${hive_table} PARTITION(year=2022,region="bj") values(5)""" + def insert_str6 = """insert into ${hive_database}.${hive_table} PARTITION(year=2022,region="sh") values(6)""" + + logger.info("hive sql: " + drop_table_str) + hive_docker """ ${drop_table_str} """ + logger.info("hive sql: " + drop_database_str) + hive_docker """ ${drop_database_str} """ + logger.info("hive sql: " + create_database_str) + hive_docker """ ${create_database_str}""" + logger.info("hive sql: " + create_table_str) + hive_docker """ ${create_table_str} """ + logger.info("hive sql: " + add_partition_str) + hive_docker """ ${add_partition_str} """ + logger.info("hive sql: " + insert_str1) + hive_docker """ ${insert_str1} """ + logger.info("hive sql: " + insert_str2) + hive_docker """ ${insert_str2} """ + logger.info("hive sql: " + insert_str3) + hive_docker """ ${insert_str3} """ + logger.info("hive sql: " + insert_str4) + hive_docker """ ${insert_str4} """ + logger.info("hive sql: " + insert_str5) + hive_docker """ ${insert_str5} """ + logger.info("hive sql: " + insert_str6) + hive_docker """ ${insert_str6} """ + + + // prepare catalog + String hms_port = context.config.otherConfigs.get("hms_port") + String catalog_name = "test_hive_multi_partition_mtmv_catalog" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + order_qt_select_base_table "SELECT * FROM ${catalog_name}.${hive_database}.${hive_table}" + + + // prepare mtmv + def mvName = "test_hive_multi_partition_mtmv" + def dbName = "regression_test_mtmv_p0" + sql """drop materialized view if exists ${mvName};""" + + // partition by year + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`year`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT k1,year,region FROM ${catalog_name}.${hive_database}.${hive_table}; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_2020")) + assertTrue(showPartitionsResult.toString().contains("p_2021")) + assertTrue(showPartitionsResult.toString().contains("p_2022")) + assertEquals(showPartitionsResult.size(),3) + // refresh p_2020 + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_2020); + """ + def jobName = getJobName(dbName, mvName); + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_year_2020 "SELECT * FROM ${mvName} order by k1,year,region" + + // refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_year_complete "SELECT * FROM ${mvName} order by k1,year,region" + + sql """drop materialized view if exists ${mvName};""" + // partition by region + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`region`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT k1,year,region FROM ${catalog_name}.${hive_database}.${hive_table}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_bj")) + assertTrue(showPartitionsResult.toString().contains("p_sh")) + assertEquals(showPartitionsResult.size(),2) + // refresh p_bj + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_bj); + """ + jobName = getJobName(dbName, mvName); + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_region_bj "SELECT * FROM ${mvName} order by k1,year,region" + + // refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_region_complete "SELECT * FROM ${mvName} order by k1,year,region" + + // hive data change + def insert_str7 = """ + insert into ${hive_database}.${hive_table} PARTITION(year=2020,region="bj") values(7); + """ + logger.info("hive sql: " + insert_str7) + hive_docker """ ${insert_str7} """ + sql """ + REFRESH catalog ${catalog_name} + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_bj); + """ + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_data_change "SELECT * FROM ${mvName} order by k1,year,region" + + // hive add partition year + def add_partition2023_bj_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(year=2023,region="bj"); + """ + logger.info("hive sql: " + add_partition2023_bj_str) + hive_docker """ ${add_partition2023_bj_str} """ + sql """ + REFRESH catalog ${catalog_name} + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(showPartitionsResult.size(),2) + + // hive add partition region + def add_partition2023_tj_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(year=2023,region="tj"); + """ + logger.info("hive sql: " + add_partition2023_tj_str) + hive_docker """ ${add_partition2023_tj_str} """ + sql """ + REFRESH catalog ${catalog_name} + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(showPartitionsResult.size(),3) + assertTrue(showPartitionsResult.toString().contains("p_tj")) + + // hive drop partition + def drop_partition2023_bj_str = """ + alter table ${hive_database}.${hive_table} drop if exists + partition(year=2023,region="bj"); + """ + logger.info("hive sql: " + drop_partition2023_bj_str) + hive_docker """ ${drop_partition2023_bj_str} """ + sql """ + REFRESH catalog ${catalog_name} + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_bj")) + assertTrue(showPartitionsResult.toString().contains("p_sh")) + assertTrue(showPartitionsResult.toString().contains("p_tj")) + + def drop_partition2023_tj_str = """ + alter table ${hive_database}.${hive_table} drop if exists + partition(year=2023,region="tj"); + """ + logger.info("hive sql: " + drop_partition2023_tj_str) + hive_docker """ ${drop_partition2023_tj_str} """ + sql """ + REFRESH catalog ${catalog_name} + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_bj")) + assertTrue(showPartitionsResult.toString().contains("p_sh")) + assertFalse(showPartitionsResult.toString().contains("p_tj")) + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalog_name}""" +} + diff --git a/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy b/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy index c9a3fd27128..82ca125104d 100644 --- a/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy @@ -61,7 +61,7 @@ suite("test_partition_refresh_mtmv") { sql """drop table if exists `${tableNameNum}`""" sql """drop materialized view if exists ${mvName};""" - // base table has two partition col + // base table has two partition col(range) sql """ CREATE TABLE `${tableNameNum}` ( `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', @@ -79,21 +79,56 @@ suite("test_partition_refresh_mtmv") { """ try { - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - partition by(`date`) - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${tableNameNum}; - """ - Assert.fail(); - } catch (Exception e) { - log.info(e.getMessage()) - } - sql """drop table if exists `${tableNameNum}`""" - sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableNameNum}; + """ + Assert.fail(); + } catch (Exception e) { + log.info(e.getMessage()) + } + sql """drop table if exists `${tableNameNum}`""" + sql """drop materialized view if exists ${mvName};""" + + // base table has two partition col(list) + sql """ + CREATE TABLE `${tableNameNum}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY LIST(`date`,`num`) + ( + PARTITION p201701_1000 VALUES IN (('2017-01-01',1), ('2017-01-01',2)), + PARTITION p201702_2000 VALUES IN (('2017-02-01',3), ('2017-02-01',4)) + ) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + try { + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableNameNum}; + """ + Assert.fail(); + } catch (Exception e) { + log.info(e.getMessage()) + } + sql """drop table if exists `${tableNameNum}`""" + sql """drop materialized view if exists ${mvName};""" // range date partition sql """ @@ -220,7 +255,8 @@ suite("test_partition_refresh_mtmv") { showPartitionsResult = sql """show partitions from ${mvName}""" logger.info("showPartitionsResult: " + showPartitionsResult.toString()) assertTrue(showPartitionsResult.toString().contains("p_1")) - assertTrue(showPartitionsResult.toString().contains("p_2_3")) + assertTrue(showPartitionsResult.toString().contains("_2")) + assertTrue(showPartitionsResult.toString().contains("_3")) sql """ REFRESH MATERIALIZED VIEW ${mvName} @@ -330,6 +366,26 @@ suite("test_partition_refresh_mtmv") { waitingMTMVTaskFinished(jobName) order_qt_refresh_other_table_change_other "SELECT * FROM ${mvName} order by user_id,age,date,num" + //test base table add partition + sql """alter table ${tableNameNum} ADD PARTITION p201704 VALUES [('2017-04-01'), ('2017-05-01'))""" + sql """ + REFRESH MATERIALIZED VIEW ${mvName}; + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20170401_20170501")) + + //test base table drop partition + sql """alter table ${tableNameNum} drop PARTITION p201704""" + sql """ + REFRESH MATERIALIZED VIEW ${mvName}; + """ + waitingMTMVTaskFinished(jobName) + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertFalse(showPartitionsResult.toString().contains("p_20170401_20170501")) + // test exclude table sql """drop materialized view if exists ${mvName};""" sql """drop table if exists `${tableNameNum}`""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org