This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-hudi-mtmv in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-hudi-mtmv by this push: new 95b5b35be3f [fix](Hudi-mtmv)Dev hudi partition reresh0325 (#49462) 95b5b35be3f is described below commit 95b5b35be3f8d5f547802965ee82fd512f3c8bdb Author: Tiewei Fang <fangtie...@selectdb.com> AuthorDate: Tue Mar 25 16:42:37 2025 +0800 [fix](Hudi-mtmv)Dev hudi partition reresh0325 (#49462) --- .../doris/datasource/ExternalMetaCacheMgr.java | 3 +- .../org/apache/doris/datasource/ExternalTable.java | 2 +- .../doris/datasource/TablePartitionValues.java | 32 +-- .../apache/doris/datasource/hive/HMSDlaTable.java | 79 +++++++ .../doris/datasource/hive/HMSExternalTable.java | 160 ++++++------- .../apache/doris/datasource/hive/HiveDlaTable.java | 141 ++++++++++++ .../datasource/hive/HiveMetaStoreClientHelper.java | 5 +- .../apache/doris/datasource/hive/HudiDlaTable.java | 121 ++++++++++ .../doris/datasource/hive/source/HiveScanNode.java | 3 +- .../doris/datasource/hudi/HudiMvccSnapshot.java | 74 ++++++ .../doris/datasource/hudi/HudiSchemaCacheKey.java | 82 +++++++ .../apache/doris/datasource/hudi/HudiUtils.java | 14 +- .../hudi/source/HudiCachedPartitionProcessor.java | 30 ++- .../doris/datasource/hudi/source/HudiScanNode.java | 19 +- .../datasource/iceberg/IcebergExternalTable.java | 3 +- .../maxcompute/MaxComputeExternalTable.java | 2 +- .../{MvccTable.java => EmptyMvccSnapshot.java} | 14 +- .../apache/doris/datasource/mvcc/MvccTable.java | 5 +- .../datasource/paimon/PaimonExternalTable.java | 3 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 3 +- .../org/apache/doris/nereids/NereidsPlanner.java | 1 - .../org/apache/doris/nereids/StatementContext.java | 6 +- .../doris/nereids/rules/analysis/BindRelation.java | 3 + .../doris/tablefunction/MetadataGenerator.java | 3 +- .../doris/datasource/hudi/HudiUtilsTest.java | 4 +- .../apache/doris/external/hms/HmsCatalogTest.java | 5 +- .../org/apache/doris/qe/HmsQueryCacheTest.java | 12 +- .../hudi/hudi_mtmv/test_hudi_mtmv.out | Bin 0 -> 8110 bytes .../hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out | Bin 0 -> 5610 bytes .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out | Bin 0 -> 250 bytes .../hudi/hudi_mtmv/test_hudi_mtmv.groovy | 256 +++++++++++++++++++++ .../hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy | 108 +++++++++ .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 91 ++++++++ 33 files changed, 1111 insertions(+), 173 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 9ad78816278..99226bc1ccf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonMetadataCache; import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; @@ -277,7 +278,7 @@ public class ExternalMetaCacheMgr { if (metaCache != null) { List<Type> partitionColumnTypes; try { - partitionColumnTypes = table.getPartitionColumnTypes(); + partitionColumnTypes = table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table)); } catch (NotSupportedException e) { LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage()); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 30bf48c3d8b..60a1f172978 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -391,7 +391,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { throw new NotImplementedException("getChunkSized not implemented"); } - protected Optional<SchemaCacheValue> getSchemaCacheValue() { + public Optional<SchemaCacheValue> getSchemaCacheValue() { ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchemaValue(dbName, name); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java index c7f2ce6f712..e928d3c739e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java @@ -51,6 +51,7 @@ public class TablePartitionValues { private long nextPartitionId; private final Map<Long, PartitionItem> idToPartitionItem; private final Map<String, Long> partitionNameToIdMap; + private Map<String, Long> partitionNameToLastModifiedMap; private final Map<Long, String> partitionIdToNameMap; private Map<Long, List<UniqueId>> idToUniqueIdsMap; @@ -68,15 +69,12 @@ public class TablePartitionValues { nextPartitionId = 0; idToPartitionItem = new HashMap<>(); partitionNameToIdMap = new HashMap<>(); + partitionNameToLastModifiedMap = new HashMap<>(); partitionIdToNameMap = new HashMap<>(); } - public TablePartitionValues(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) { - this(); - addPartitions(partitionNames, partitionValues, types); - } - - public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) { + public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types, + List<Long> partitionLastUpdateTimestamp) { Preconditions.checkState(partitionNames.size() == partitionValues.size()); List<String> addPartitionNames = new ArrayList<>(); List<PartitionItem> addPartitionItems = new ArrayList<>(); @@ -90,6 +88,7 @@ public class TablePartitionValues { addPartitionNames.add(partitionNames.get(i)); addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types)); } + partitionNameToLastModifiedMap.put(partitionNames.get(i), partitionLastUpdateTimestamp.get(i)); } cleanPartitions(); @@ -123,23 +122,6 @@ public class TablePartitionValues { partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem); } - public void dropPartitions(List<String> partitionNames, List<Type> types) { - partitionNames.forEach(p -> { - Long removedPartition = partitionNameToIdMap.get(p); - if (removedPartition != null) { - idToPartitionItem.remove(removedPartition); - } - }); - List<String> remainingPartitionNames = new ArrayList<>(); - List<PartitionItem> remainingPartitionItems = new ArrayList<>(); - partitionNameToIdMap.forEach((partitionName, partitionId) -> { - remainingPartitionNames.add(partitionName); - remainingPartitionItems.add(idToPartitionItem.get(partitionId)); - }); - cleanPartitions(); - addPartitionItems(remainingPartitionNames, remainingPartitionItems, types); - } - public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } @@ -148,6 +130,10 @@ public class TablePartitionValues { this.lastUpdateTimestamp = lastUpdateTimestamp; } + public Map<String, Long> getPartitionNameToLastModifiedMap() { + return partitionNameToLastModifiedMap; + } + public Lock readLock() { return readWriteLock.readLock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java new file mode 100644 index 00000000000..1710646ce3d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIf; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * This abstract class represents a Hive Metastore (HMS) Dla Table and provides a blueprint for + * various operations related to metastore tables in Doris. + * + * Purpose: + * - To encapsulate common functionalities that HMS Dla tables should have for implementing other interfaces + * + * Why needed: + * - To provide a unified way to manage and interact with different kinds of Dla Table + * - To facilitate the implementation of multi-table materialized views (MTMV) by providing necessary + * methods for snapshot and partition management. + * - To abstract out the specific details of HMS table operations, making the code more modular and maintainable. + */ +public abstract class HMSDlaTable implements MTMVBaseTableIf { + protected HMSExternalTable hmsTable; + + public HMSDlaTable(HMSExternalTable table) { + this.hmsTable = table; + } + + abstract Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) + throws AnalysisException; + + abstract PartitionType getPartitionType(Optional<MvccSnapshot> snapshot); + + abstract Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot); + + abstract List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot); + + abstract MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException; + + abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException; + + abstract boolean isPartitionColumnAllowNull(); + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(hmsTable.getCatalog().getName(), hmsTable.getDbName(), hmsTable.getName(), true); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index dfd7d6fe4b8..492fe76da56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -31,21 +31,25 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.hudi.HudiMvccSnapshot; +import org.apache.doris.datasource.hudi.HudiSchemaCacheKey; import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.mtmv.MTMVBaseTableIf; -import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; -import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.qe.GlobalVariable; @@ -105,7 +109,7 @@ import java.util.stream.Collectors; /** * Hive metastore external table. */ -public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class); public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS; @@ -167,6 +171,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private DLAType dlaType = DLAType.UNKNOWN; + private HMSDlaTable dlaTable; + // record the event update time when enable hms event listener protected volatile long eventUpdateTime; @@ -204,10 +210,13 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } else { if (supportedIcebergTable()) { dlaType = DLAType.ICEBERG; + dlaTable = new HiveDlaTable(this); } else if (supportedHoodieTable()) { dlaType = DLAType.HUDI; + dlaTable = new HudiDlaTable(this); } else if (supportedHiveTable()) { dlaType = DLAType.HIVE; + dlaTable = new HiveDlaTable(this); } else { // Should not reach here. Because `supportedHiveTable` will throw exception if not return true. throw new NotSupportedException("Unsupported dlaType for table: " + getNameWithFullQualifiers()); @@ -298,23 +307,45 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return remoteTable; } - public List<Type> getPartitionColumnTypes() { + @Override + public List<Column> getFullSchema() { makeSureInitialized(); + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + if (getDlaType() == DLAType.HUDI) { + return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)) + .getSchema(); + } + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name); + return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); + } + + public List<Type> getPartitionColumnTypes(Optional<MvccSnapshot> snapshot) { + makeSureInitialized(); + if (getDlaType() == DLAType.HUDI) { + return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(snapshot).getPartitionColTypes(); + } Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes()) .orElse(Collections.emptyList()); } - public List<Column> getPartitionColumns() { + public List<Type> getHudiPartitionColumnTypes(long timestamp) { makeSureInitialized(); - Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns()) + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue( + new HudiSchemaCacheKey(dbName, name, timestamp)); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes()) .orElse(Collections.emptyList()); } + public List<Column> getPartitionColumns() { + return getPartitionColumns(MvccUtil.getSnapshotFromContext(this)); + } + @Override public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { - return getPartitionColumns(); + makeSureInitialized(); + return dlaTable.getPartitionColumns(snapshot); } @Override @@ -354,7 +385,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) this.getCatalog()); - List<Type> partitionColumnTypes = this.getPartitionColumnTypes(); + List<Type> partitionColumnTypes = this.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this)); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( this.getDbName(), this.getName(), partitionColumnTypes); Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); @@ -563,10 +594,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI @Override public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) { - return initSchemaAndUpdateTime(); - } - - public Optional<SchemaCacheValue> initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); // try to use transient_lastDdlTime from hms client @@ -575,7 +602,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI ? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000 // use current timestamp if lastDdlTime does not exist (hive views don't have this prop) : System.currentTimeMillis(); - return initSchema(); + return initSchema(key); } public long getLastDdlTime() { @@ -588,12 +615,12 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public Optional<SchemaCacheValue> initSchema() { + public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) { makeSureInitialized(); if (dlaType.equals(DLAType.ICEBERG)) { return getIcebergSchema(); } else if (dlaType.equals(DLAType.HUDI)) { - return getHudiSchema(); + return getHudiSchema(key); } else { return getHiveSchema(); } @@ -605,11 +632,12 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } - private Optional<SchemaCacheValue> getHudiSchema() { + private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) { boolean[] enableSchemaEvolution = {false}; - InternalSchema hudiInternalSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution); + HudiSchemaCacheKey hudiSchemaCacheKey = (HudiSchemaCacheKey) key; + InternalSchema hudiInternalSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution, + Long.toString(hudiSchemaCacheKey.getTimestamp())); org.apache.avro.Schema hudiSchema = AvroInternalSchemaConverter.convert(hudiInternalSchema, name); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size()); List<String> colTypes = Lists.newArrayList(); for (int i = 0; i < hudiSchema.getFields().size(); i++) { @@ -857,96 +885,46 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI @Override public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { - return getPartitionType(); + makeSureInitialized(); + return dlaTable.getPartitionType(snapshot); } - public PartitionType getPartitionType() { - return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + public Set<String> getPartitionColumnNames() { + return getPartitionColumnNames(MvccUtil.getSnapshotFromContext(this)); } @Override public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) { - return getPartitionColumnNames(); - } - - public Set<String> getPartitionColumnNames() { - return getPartitionColumns().stream() - .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + makeSureInitialized(); + return dlaTable.getPartitionColumnNames(snapshot); } @Override - public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { - return getNameToPartitionItems(); + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) + throws AnalysisException { + makeSureInitialized(); + return dlaTable.getAndCopyPartitionItems(snapshot); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) throws AnalysisException { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - Long partitionId = getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues); - HivePartition hivePartition = getHivePartitionByIdOrAnalysisException(partitionId, - hivePartitionValues, cache); - return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime()); + makeSureInitialized(); + return dlaTable.getPartitionSnapshot(partitionName, context, snapshot); } @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) throws AnalysisException { - if (getPartitionType() == PartitionType.UNPARTITIONED) { - return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); - } - HivePartition maxPartition = null; - long maxVersionTime = 0L; - long visibleVersionTime; - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - List<HivePartition> partitionList = cache.getAllPartitionsWithCache(getDbName(), getName(), - Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values())); - if (CollectionUtils.isEmpty(partitionList)) { - throw new AnalysisException("partitionList is empty, table name: " + getName()); - } - for (HivePartition hivePartition : partitionList) { - visibleVersionTime = hivePartition.getLastModifiedTime(); - if (visibleVersionTime > maxVersionTime) { - maxVersionTime = visibleVersionTime; - maxPartition = hivePartition; - } - } - return new MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(getPartitionColumns()), maxVersionTime); - } - - private Long getPartitionIdByNameOrAnalysisException(String partitionName, - HiveMetaStoreCache.HivePartitionValues hivePartitionValues) - throws AnalysisException { - Long partitionId = hivePartitionValues.getPartitionNameToIdMap().get(partitionName); - if (partitionId == null) { - throw new AnalysisException("can not find partition: " + partitionName); - } - return partitionId; + makeSureInitialized(); + return dlaTable.getTableSnapshot(context, snapshot); } - private HivePartition getHivePartitionByIdOrAnalysisException(Long partitionId, - HiveMetaStoreCache.HivePartitionValues hivePartitionValues, - HiveMetaStoreCache cache) throws AnalysisException { - List<String> partitionValues = hivePartitionValues.getPartitionValuesMap().get(partitionId); - if (CollectionUtils.isEmpty(partitionValues)) { - throw new AnalysisException("can not find partitionValues: " + partitionId); - } - HivePartition partition = cache.getHivePartition(getDbName(), getName(), partitionValues); - if (partition == null) { - throw new AnalysisException("can not find partition: " + partitionId); - } - return partition; - } @Override public boolean isPartitionColumnAllowNull() { - return true; + makeSureInitialized(); + return dlaTable.isPartitionColumnAllowNull(); } /** @@ -1008,7 +986,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) catalog); - List<Type> partitionColumnTypes = getPartitionColumnTypes(); + List<Type> partitionColumnTypes = getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this)); HiveMetaStoreCache.HivePartitionValues partitionValues = null; // Get table partitions from cache. if (!partitionColumnTypes.isEmpty()) { @@ -1082,6 +1060,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + makeSureInitialized(); + dlaTable.beforeMTMVRefresh(mtmv); + } + + @Override + public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) { + if (getDlaType() == DLAType.HUDI) { + return new HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this)); + } + return new EmptyMvccSnapshot(); } public HoodieTableMetaClient getHudiClient() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java new file mode 100644 index 00000000000..296b2f3667a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; + +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class HiveDlaTable extends HMSDlaTable { + + public HiveDlaTable(HMSExternalTable table) { + super(table); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(snapshot).stream() + .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + Optional<SchemaCacheValue> schemaCacheValue = hmsTable.getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns()) + .orElse(Collections.emptyList()); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return hmsTable.getNameToPartitionItems(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( + hmsTable.getDbName(), hmsTable.getName(), hmsTable.getPartitionColumnTypes(snapshot)); + Long partitionId = getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues); + HivePartition hivePartition = getHivePartitionByIdOrAnalysisException(partitionId, + hivePartitionValues, cache); + return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + if (hmsTable.getPartitionType(snapshot) == PartitionType.UNPARTITIONED) { + return new MTMVMaxTimestampSnapshot(hmsTable.getName(), hmsTable.getLastDdlTime()); + } + HivePartition maxPartition = null; + long maxVersionTime = 0L; + long visibleVersionTime; + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( + hmsTable.getDbName(), hmsTable.getName(), hmsTable.getPartitionColumnTypes(snapshot)); + List<HivePartition> partitionList = cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(), + Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values())); + if (CollectionUtils.isEmpty(partitionList)) { + throw new AnalysisException("partitionList is empty, table name: " + hmsTable.getName()); + } + for (HivePartition hivePartition : partitionList) { + visibleVersionTime = hivePartition.getLastModifiedTime(); + if (visibleVersionTime > maxVersionTime) { + maxVersionTime = visibleVersionTime; + maxPartition = hivePartition; + } + } + return new MTMVMaxTimestampSnapshot(maxPartition.getPartitionName( + hmsTable.getPartitionColumns()), maxVersionTime); + } + + private Long getPartitionIdByNameOrAnalysisException(String partitionName, + HiveMetaStoreCache.HivePartitionValues hivePartitionValues) + throws AnalysisException { + Long partitionId = hivePartitionValues.getPartitionNameToIdMap().get(partitionName); + if (partitionId == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return partitionId; + } + + private HivePartition getHivePartitionByIdOrAnalysisException(Long partitionId, + HiveMetaStoreCache.HivePartitionValues hivePartitionValues, + HiveMetaStoreCache cache) throws AnalysisException { + List<String> partitionValues = hivePartitionValues.getPartitionValuesMap().get(partitionId); + if (CollectionUtils.isEmpty(partitionValues)) { + throw new AnalysisException("can not find partitionValues: " + partitionId); + } + HivePartition partition = cache.getHivePartition(hmsTable.getDbName(), hmsTable.getName(), partitionValues); + if (partition == null) { + throw new AnalysisException("can not find partition: " + partitionId); + } + return partition; + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 9bb09225607..07f133992d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -802,7 +802,8 @@ public class HiveMetaStoreClientHelper { return output.toString(); } - public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[] enableSchemaEvolution) { + public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[] enableSchemaEvolution, + String timestamp) { HoodieTableMetaClient metaClient = table.getHudiClient(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); @@ -814,7 +815,7 @@ public class HiveMetaStoreClientHelper { // So, we should reload timeline so that we can read the latest commit files. metaClient.reloadActiveTimeline(); - Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(timestamp); if (internalSchemaOption.isPresent()) { enableSchemaEvolution[0] = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java new file mode 100644 index 00000000000..952d8e3311e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.hudi.HudiMvccSnapshot; +import org.apache.doris.datasource.hudi.HudiSchemaCacheKey; +import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; + +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class HudiDlaTable extends HMSDlaTable { + + public HudiDlaTable(HMSExternalTable table) { + super(table); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(snapshot).stream() + .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getHudiSchemaCacheValue(snapshot).getPartitionColumns(); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + TablePartitionValues tablePartitionValues = getOrFetchHudiSnapshotCacheValue(snapshot); + Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem(); + Map<Long, String> partitionIdToNameMap = tablePartitionValues.getPartitionIdToNameMap(); + Map<String, PartitionItem> copiedPartitionItems = Maps.newHashMap(); + for (Long key : partitionIdToNameMap.keySet()) { + copiedPartitionItems.put(partitionIdToNameMap.get(key), idToPartitionItem.get(key)); + } + return copiedPartitionItems; + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + Map<String, Long> partitionNameToLastModifiedMap = getOrFetchHudiSnapshotCacheValue( + snapshot).getPartitionNameToLastModifiedMap(); + return new MTMVTimestampSnapshot(partitionNameToLastModifiedMap.get(partitionName)); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVTimestampSnapshot(getOrFetchHudiSnapshotCacheValue(snapshot).getLastUpdateTimestamp()); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot> snapshot) { + TablePartitionValues snapshotCacheValue = getOrFetchHudiSnapshotCacheValue(snapshot); + return getHudiSchemaCacheValue(snapshotCacheValue.getLastUpdateTimestamp()); + } + + private TablePartitionValues getOrFetchHudiSnapshotCacheValue(Optional<MvccSnapshot> snapshot) { + if (snapshot.isPresent()) { + return ((HudiMvccSnapshot) snapshot.get()).getTablePartitionValues(); + } else { + return HudiUtils.getPartitionValues(Optional.empty(), hmsTable); + } + } + + private HMSSchemaCacheValue getHudiSchemaCacheValue(long timestamp) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog()); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue( + new HudiSchemaCacheKey(hmsTable.getDbName(), hmsTable.getName(), timestamp)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, hmsTable.getCatalog().getName(), hmsTable.getDbName(), hmsTable.getName(), timestamp); + } + return (HMSSchemaCacheValue) schemaCacheValue.get(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 890f6147f33..b90c390406d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.fs.DirectoryLister; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; @@ -133,7 +134,7 @@ public class HiveScanNode extends FileQueryScanNode { List<HivePartition> resPartitions = Lists.newArrayList(); HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); - List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); + List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable)); if (!partitionColumnTypes.isEmpty()) { // partitioned table Collection<PartitionItem> partitionItems; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java new file mode 100644 index 00000000000..0f01291e54c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +/** + * Implementation of MvccSnapshot for Hudi tables that maintains partition values + * for MVCC (Multiversion Concurrency Control) operations. + * This class is immutable to ensure thread safety. + */ +public class HudiMvccSnapshot implements MvccSnapshot { + private final TablePartitionValues tablePartitionValues; + + /** + * Creates a new HudiMvccSnapshot with the specified partition values. + * + * @param tablePartitionValues The partition values for the snapshot + * @throws IllegalArgumentException if tablePartitionValues is null + */ + public HudiMvccSnapshot(TablePartitionValues tablePartitionValues) { + if (tablePartitionValues == null) { + throw new IllegalArgumentException("TablePartitionValues cannot be null"); + } + this.tablePartitionValues = tablePartitionValues; + } + + /** + * Gets the table partition values associated with this snapshot. + * + * @return The immutable TablePartitionValues object + */ + public TablePartitionValues getTablePartitionValues() { + return tablePartitionValues; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiMvccSnapshot that = (HudiMvccSnapshot) o; + return tablePartitionValues.equals(that.tablePartitionValues); + } + + @Override + public int hashCode() { + return tablePartitionValues.hashCode(); + } + + @Override + public String toString() { + return String.format("HudiMvccSnapshot{tablePartitionValues=%s}", tablePartitionValues); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java new file mode 100644 index 00000000000..5a5b0dc044e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +/** + * Cache key for Hudi table schemas that includes timestamp information. + * This allows for time-travel queries and ensures proper schema versioning. + */ +public class HudiSchemaCacheKey extends SchemaCacheKey { + private final long timestamp; + + /** + * Creates a new cache key for Hudi table schemas. + * + * @param dbName The database name + * @param tableName The table name + * @param timestamp The timestamp for schema version + * @throws IllegalArgumentException if dbName or tableName is null or empty + */ + public HudiSchemaCacheKey(String dbName, String tableName, long timestamp) { + super(dbName, tableName); + if (timestamp < 0) { + throw new IllegalArgumentException("Timestamp cannot be negative"); + } + this.timestamp = timestamp; + } + + /** + * Gets the timestamp associated with this schema version. + * + * @return the timestamp value + */ + public long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + HudiSchemaCacheKey that = (HudiSchemaCacheKey) o; + return timestamp == that.timestamp; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), timestamp); + } + + @Override + public String toString() { + return String.format("HudiSchemaCacheKey{dbName='%s', tableName='%s', timestamp=%d}", + getDbName(), getTblName(), timestamp); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index 894103afe2c..c0e2a783ac3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.hive.HMSExternalTable; @@ -257,10 +258,6 @@ public class HudiUtils { public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> tableSnapshot, HMSExternalTable hmsTable) { TablePartitionValues partitionValues = new TablePartitionValues(); - if (hmsTable.getPartitionColumns().isEmpty()) { - //isn't partition table. - return partitionValues; - } HoodieTableMetaClient hudiClient = hmsTable.getHudiClient(); HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() @@ -312,10 +309,11 @@ public class HudiUtils { return schemaInfo; } - public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable hmsTable) { - ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getSchemaCache(hmsTable.getCatalog()); - Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(hmsTable.getDbName(), hmsTable.getName()); + public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable hmsTable, String queryInstant) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog()); + SchemaCacheKey key = new HudiSchemaCacheKey(hmsTable.getDbName(), hmsTable.getName(), + Long.parseLong(queryInstant)); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(key); return (HudiSchemaCacheValue) schemaCacheValue.get(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 62094b21c2b..ef921cbfa47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -85,14 +86,15 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) { Preconditions.checkState(catalogId == table.getCatalog().getId()); + TablePartitionValues partitionValues = new TablePartitionValues(); Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); - if (!partitionColumns.isPresent()) { - return null; + if (!partitionColumns.isPresent() || partitionColumns.get().length == 0) { + return partitionValues; } HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); Option<HoodieInstant> lastInstant = timeline.lastInstant(); if (!lastInstant.isPresent()) { - return null; + return partitionValues; } long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); if (Long.parseLong(timestamp) == lastTimestamp) { @@ -100,10 +102,13 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp); List<String> partitionNames = Arrays.asList(partitionColumns.get()); - TablePartitionValues partitionValues = new TablePartitionValues(); + // we don't support auto refresh hudi mtmv currently, + // so the list `partitionLastUpdateTimestamp` is full of 0L. partitionValues.addPartitions(partitionNameAndValues, partitionNameAndValues.stream().map(p -> parsePartitionValues(partitionNames, p)) - .collect(Collectors.toList()), table.getPartitionColumnTypes()); + .collect(Collectors.toList()), table.getHudiPartitionColumnTypes(Long.parseLong(timestamp)), + Collections.nCopies(partitionNameAndValues.size(), 0L)); + partitionValues.setLastUpdateTimestamp(Long.parseLong(timestamp)); return partitionValues; } @@ -111,19 +116,21 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { boolean useHiveSyncPartition) throws CacheException { Preconditions.checkState(catalogId == table.getCatalog().getId()); + TablePartitionValues partitionValues = new TablePartitionValues(); Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); - if (!partitionColumns.isPresent()) { - return null; + if (!partitionColumns.isPresent() || partitionColumns.get().length == 0) { + return partitionValues; } HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); Option<HoodieInstant> lastInstant = timeline.lastInstant(); if (!lastInstant.isPresent()) { - return null; + return partitionValues; } try { long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); - TablePartitionValues partitionValues = partitionCache.get( - new TablePartitionKey(table.getDbName(), table.getName(), table.getPartitionColumnTypes())); + partitionValues = partitionCache.get( + new TablePartitionKey(table.getDbName(), table.getName(), + table.getHudiPartitionColumnTypes(lastTimestamp))); partitionValues.readLock().lock(); try { long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); @@ -159,7 +166,8 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) - .collect(Collectors.toList()), table.getPartitionColumnTypes()); + .collect(Collectors.toList()), table.getHudiPartitionColumnTypes(lastTimestamp), + Collections.nCopies(partitionNames.size(), 0L)); partitionValues.setLastUpdateTimestamp(lastTimestamp); return partitionValues; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 17543ec3949..00697a5469f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -29,14 +29,13 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.fs.DirectoryLister; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.SessionVariable; @@ -166,11 +165,6 @@ public class HudiScanNode extends HiveScanNode { basePath = hmsTable.getRemoteTable().getSd().getLocation(); inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib(); - ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog()); - Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(table.getDbName(), table.getName()); - HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue) schemaCacheValue.get(); - columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList()); - columnTypes = hudiSchemaCacheValue.getColTypes(); if (scanParams != null && !scanParams.incrementalRead()) { // Only support incremental read @@ -211,11 +205,16 @@ public class HudiScanNode extends HiveScanNode { } queryInstant = snapshotInstant.get().getTimestamp(); } + + HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable, queryInstant); + columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList()); + columnTypes = hudiSchemaCacheValue.getColTypes(); + fsView = Env.getCurrentEnv() .getExtMetaCacheMgr() .getFsViewProcessor(hmsTable.getCatalog()) .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient); - if (HudiUtils.getSchemaCacheValue(hmsTable).isEnableSchemaEvolution()) { + if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } } @@ -270,7 +269,7 @@ public class HudiScanNode extends HiveScanNode { // fileDesc.setNestedFields(hudiSplit.getNestedFields()); fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner()); } else { - HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable); + HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable, queryInstant); if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { long commitInstantTime = Long.parseLong(FSUtils.getCommitTime( new File(hudiSplit.getPath().get()).getName())); @@ -291,7 +290,7 @@ public class HudiScanNode extends HiveScanNode { } private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient metaClient) { - List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); + List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable)); if (!partitionColumnTypes.isEmpty()) { this.totalPartitionNum = selectedPartitions.totalPartitionNum; Map<String, PartitionItem> prunedPartitions = selectedPartitions.selectedPartitions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index fefd6b76b0f..16964ead1d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; @@ -278,7 +279,7 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa } @Override - public MvccSnapshot loadSnapshot() { + public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) { return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 1136fb079f3..a598c68703f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -134,7 +134,7 @@ public class MaxComputeExternalTable extends ExternalTable { partitionSpecs.stream() .map(p -> parsePartitionValues(partitionColumnNames, p)) .collect(Collectors.toList()), - partitionTypes); + partitionTypes, Collections.nCopies(partitionSpecs.size(), 0L)); return partitionValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java similarity index 67% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java index d69e0f3114d..35f63291a25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java @@ -17,17 +17,5 @@ package org.apache.doris.datasource.mvcc; -import org.apache.doris.catalog.TableIf; - -/** - * The table that needs to query data based on the version needs to implement this interface. - */ -public interface MvccTable extends TableIf { - /** - * Retrieve the current snapshot information of the table, - * and the returned result will be used for the entire process of this query - * - * @return MvccSnapshot - */ - MvccSnapshot loadSnapshot(); +public class EmptyMvccSnapshot implements MvccSnapshot { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java index d69e0f3114d..89b1d6e9b07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java @@ -17,8 +17,11 @@ package org.apache.doris.datasource.mvcc; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.TableIf; +import java.util.Optional; + /** * The table that needs to query data based on the version needs to implement this interface. */ @@ -29,5 +32,5 @@ public interface MvccTable extends TableIf { * * @return MvccSnapshot */ - MvccSnapshot loadSnapshot(); + MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 5d3e7dc3f5b..8a97aa856a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.paimon; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; @@ -212,7 +213,7 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab } @Override - public MvccSnapshot loadSnapshot() { + public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) { return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 016d0a1d495..105ef0d9d69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -80,6 +80,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -381,7 +382,7 @@ public class MTMVTask extends AbstractTask { } if (tableIf instanceof MvccTable) { MvccTable mvccTable = (MvccTable) tableIf; - MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty()); snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 56adf5f2f82..258bab38d48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -224,7 +224,6 @@ public class NereidsPlanner extends Planner { // collect table and lock them in the order of table id collectAndLockTable(showAnalyzeProcess(explainLevel, showPlanProcess)); // after table collector, we should use a new context. - statementContext.loadSnapshots(); Plan resultPlan = planWithoutLock(plan, requireProperties, explainLevel, showPlanProcess); lockCallback.accept(resultPlan); if (statementContext.getConnectContext().getExecutor() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 4f0e7c27efd..ab7c9f77fc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; @@ -658,13 +659,12 @@ public class StatementContext implements Closeable { /** * Load snapshot information of mvcc */ - public void loadSnapshots() { + public void loadSnapshots(Optional<TableSnapshot> tableSnapshot, MvccTableInfo mvccTableInfo) { for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); // may be set by MTMV, we can not load again if (!snapshots.containsKey(mvccTableInfo)) { - snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot(tableSnapshot)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 82dddc9575a..0f161088535 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -35,6 +35,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.nereids.CTEContext; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.SqlCacheContext; @@ -378,6 +379,8 @@ public class BindRelation extends OneAnalysisRuleFactory { List<String> qualifierWithoutTableName = Lists.newArrayList(); qualifierWithoutTableName.addAll(qualifiedTableName.subList(0, qualifiedTableName.size() - 1)); + MvccTableInfo mvccTableInfo = new MvccTableInfo(table); + cascadesContext.getStatementContext().loadSnapshots(unboundRelation.getTableSnapshot(), mvccTableInfo); boolean isView = false; try { switch (table.getType()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 373e3d415bb..4e7cd6b5116 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -60,6 +60,7 @@ import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; @@ -1610,7 +1611,7 @@ public class MetadataGenerator { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) tbl.getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes()); + tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(tbl))); Map<Long, List<String>> valuesMap = hivePartitionValues.getPartitionValuesMap(); List<TRow> dataBatch = Lists.newArrayList(); for (Map.Entry<Long, List<String>> entry : valuesMap.entrySet()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java index 016b6616f0b..409fc1daf72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java @@ -175,7 +175,7 @@ public class HudiUtilsTest { HMSExternalCatalog catalog = new HMSExternalCatalog(); HMSExternalDatabase db = new HMSExternalDatabase(catalog, 1, "db", "db"); HMSExternalTable hmsExternalTable = new HMSExternalTable(2, "tb", "tb", catalog, db); - HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new boolean[] {false}); + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new boolean[] {false}, "20241219214518880"); // 4. delete the commit file, // this operation is used to imitate the clean operation in hudi @@ -189,7 +189,7 @@ public class HudiUtilsTest { // because we will refresh timeline in this `getHudiTableSchema` method, // and we can get the latest commit. // so that this error: `Could not read commit details from file <table_path>/.hoodie/1.commit` will be not reported. - HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new boolean[] {false}); + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new boolean[] {false}, "20241219214518880"); // 7. clean up Assert.assertTrue(commit2.delete()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java index 30a233dd1a9..4abed2ee708 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java @@ -29,11 +29,13 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.CatalogMgr; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; +import org.apache.doris.datasource.hive.HiveDlaTable; import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; import org.apache.doris.qe.SessionVariable; @@ -107,6 +109,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase { Deencapsulation.setField(tbl, "catalog", hmsCatalog); Deencapsulation.setField(tbl, "dbName", "hms_db"); Deencapsulation.setField(tbl, "name", "hms_tbl"); + Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl)); new Expectations(tbl) { { tbl.getId(); @@ -138,7 +141,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase { result = TableIf.TableType.HMS_EXTERNAL_TABLE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(); + tbl.initSchemaAndUpdateTime(new ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl")); minTimes = 0; tbl.getDatabase(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index 0a981dab8a9..376f8cba4e8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -31,10 +31,12 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.CatalogMgr; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; +import org.apache.doris.datasource.hive.HiveDlaTable; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; import org.apache.doris.planner.OlapScanNode; @@ -123,6 +125,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { Deencapsulation.setField(tbl, "catalog", hmsCatalog); Deencapsulation.setField(tbl, "dbName", "hms_db"); Deencapsulation.setField(tbl, "name", "hms_tbl"); + Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl)); new Expectations(tbl) { { tbl.getId(); @@ -158,7 +161,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl.initSchemaAndUpdateTime(); + tbl.initSchemaAndUpdateTime(new ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl")); minTimes = 0; tbl.getDatabase(); @@ -173,6 +176,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { Deencapsulation.setField(tbl2, "catalog", hmsCatalog); Deencapsulation.setField(tbl2, "dbName", "hms_db"); Deencapsulation.setField(tbl2, "name", "hms_tbl2"); + Deencapsulation.setField(tbl2, "dlaTable", new HiveDlaTable(tbl2)); new Expectations(tbl2) { { tbl2.getId(); @@ -208,7 +212,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { result = DLAType.HIVE; // mock initSchemaAndUpdateTime and do nothing - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(new ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl2")); minTimes = 0; tbl2.getDatabase(); @@ -386,7 +390,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(new ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName())); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); @@ -434,7 +438,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4); // invoke initSchemaAndUpdateTime first and init schemaUpdateTime - tbl2.initSchemaAndUpdateTime(); + tbl2.initSchemaAndUpdateTime(new ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName())); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); diff --git a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out new file mode 100644 index 00000000000..f8870d0b287 Binary files /dev/null and b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out differ diff --git a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out new file mode 100644 index 00000000000..30b64a98ad8 Binary files /dev/null and b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out differ diff --git a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out new file mode 100644 index 00000000000..77597631587 Binary files /dev/null and b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out differ diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy new file mode 100644 index 00000000000..5bfcea11d67 --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_mtmv", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled hudi test") + return + } + String suiteName = "test_hudi_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + String otherDbName = "${suiteName}_otherdb" + String tableName = "${suiteName}_table" + + sql """drop database if exists ${otherDbName}""" + sql """create database ${otherDbName}""" + sql """ + CREATE TABLE ${otherDbName}.${tableName} ( + `user_id` INT, + `num` INT + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + sql """ + insert into ${otherDbName}.${tableName} values(1,2); + """ + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES ( + ${props} + );""" + + order_qt_base_table """ select * from ${catalogName}.hudi_mtmv_regression_test.hudi_table_1; """ + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES ( + ${props} + );""" + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + + // not have partition + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + KEY(`id`) + COMMENT "comment1" + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1',"grace_period"="333") + AS + SELECT id,age,par FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1; + """ + order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + //should can refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_not_partition "SELECT * FROM ${mvName} " + order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + sql """drop materialized view if exists ${mvName};""" + + // refresh on schedule + // sql """ + // CREATE MATERIALIZED VIEW ${mvName} + // BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 SECOND STARTS "9999-12-13 21:07:09" + // KEY(`id`) + // COMMENT "comment1" + // DISTRIBUTED BY HASH(`id`) BUCKETS 2 + // PROPERTIES ('replication_num' = '1',"grace_period"="333") + // AS + // SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1; + // """ + // waitingMTMVTaskFinishedByMvName(mvName) + // sql """drop materialized view if exists ${mvName};""" + + // refresh on schedule + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD IMMEDIATE REFRESH AUTO ON commit + KEY(`id`) + COMMENT "comment1" + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1',"grace_period"="333") + AS + SELECT id,age,par FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1; + """ + waitingMTMVTaskFinishedByMvName(mvName) + sql """drop materialized view if exists ${mvName};""" + + // cross db and join internal table + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join internal.${otherDbName}.${tableName} b on a.id=b.user_id; + """ + def showJoinPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showJoinPartitionsResult: " + showJoinPartitionsResult.toString()) + assertTrue(showJoinPartitionsResult.toString().contains("p_a")) + assertTrue(showJoinPartitionsResult.toString().contains("p_b")) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_join_one_partition "SELECT * FROM ${mvName} " + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`create_date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions; + """ + def showTwoPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showTwoPartitionsResult: " + showTwoPartitionsResult.toString()) + assertTrue(showTwoPartitionsResult.toString().contains("p_20200101")) + assertTrue(showTwoPartitionsResult.toString().contains("p_20380101")) + assertTrue(showTwoPartitionsResult.toString().contains("p_20380102")) + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto; + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_two_partition "SELECT * FROM ${mvName} " + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`create_date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d', + 'partition_sync_time_unit'='MONTH') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions; + """ + def showLimitPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showLimitPartitionsResult: " + showLimitPartitionsResult.toString()) + assertFalse(showLimitPartitionsResult.toString().contains("p_20200101")) + assertTrue(showLimitPartitionsResult.toString().contains("p_20380101")) + assertTrue(showLimitPartitionsResult.toString().contains("p_20380102")) + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto; + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_limit_partition "SELECT * FROM ${mvName} " + sql """drop materialized view if exists ${mvName};""" + + // not allow date trunc + test { + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`create_date`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d', + 'partition_sync_time_unit'='MONTH') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions; + """ + exception "only support" + } + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`region`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_null_partition; + """ + def showNullPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showNullPartitionsResult: " + showNullPartitionsResult.toString()) + // assertTrue(showNullPartitionsResult.toString().contains("p_null")) + assertTrue(showNullPartitionsResult.toString().contains("p_NULL")) + assertTrue(showNullPartitionsResult.toString().contains("p_bj")) + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto; + """ + waitingMTMVTaskFinishedByMvName(mvName) + // Will lose null data + order_qt_null_partition "SELECT * FROM ${mvName} " + sql """drop materialized view if exists ${mvName};""" + + sql """drop catalog if exists ${catalogName}""" + +} diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy new file mode 100644 index 00000000000..a0ac9b0783f --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_olap_rewrite_mtmv", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled hudi test") + return + } + String suiteName = "test_hudi_olap_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + String tableName = "${suiteName}_table" + sql """drop table if exists ${tableName}""" + sql """ + CREATE TABLE ${tableName} ( + `user_id` INT, + `num` INT + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,2); + """ + + sql """analyze table internal.`${dbName}`. ${tableName} with sync""" + sql """alter table internal.`${dbName}`. ${tableName} modify column user_id set stats ('row_count'='1');""" + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join ${tableName} b on a.id=b.user_id;"; + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES ( + ${props} + );""" + + sql """analyze table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync""" + sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 modify column par set stats ('row_count'='10');""" + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + // select p_b should not rewrite + mv_rewrite_fail("SELECT * FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join ${tableName} b on a.id=b.user_id where a.par='b';", "${mvName}") + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" +} diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy new file mode 100644 index 00000000000..95c71c48475 --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled hudi test") + return + } + String suiteName = "test_hudi_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 group by par;"; + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES ( + ${props} + );""" + + sql """analyze table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync""" + sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 modify column par set stats ('row_count'='10');""" + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + // select p_b should not rewrite + mv_rewrite_fail("SELECT par,count(*) as num FROM ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 where par='b' group by par;", "${mvName}") + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org