This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-hudi-mtmv
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-hudi-mtmv by this push:
     new 95b5b35be3f [fix](Hudi-mtmv)Dev hudi partition reresh0325 (#49462)
95b5b35be3f is described below

commit 95b5b35be3f8d5f547802965ee82fd512f3c8bdb
Author: Tiewei Fang <fangtie...@selectdb.com>
AuthorDate: Tue Mar 25 16:42:37 2025 +0800

    [fix](Hudi-mtmv)Dev hudi partition reresh0325 (#49462)
---
 .../doris/datasource/ExternalMetaCacheMgr.java     |   3 +-
 .../org/apache/doris/datasource/ExternalTable.java |   2 +-
 .../doris/datasource/TablePartitionValues.java     |  32 +--
 .../apache/doris/datasource/hive/HMSDlaTable.java  |  79 +++++++
 .../doris/datasource/hive/HMSExternalTable.java    | 160 ++++++-------
 .../apache/doris/datasource/hive/HiveDlaTable.java | 141 ++++++++++++
 .../datasource/hive/HiveMetaStoreClientHelper.java |   5 +-
 .../apache/doris/datasource/hive/HudiDlaTable.java | 121 ++++++++++
 .../doris/datasource/hive/source/HiveScanNode.java |   3 +-
 .../doris/datasource/hudi/HudiMvccSnapshot.java    |  74 ++++++
 .../doris/datasource/hudi/HudiSchemaCacheKey.java  |  82 +++++++
 .../apache/doris/datasource/hudi/HudiUtils.java    |  14 +-
 .../hudi/source/HudiCachedPartitionProcessor.java  |  30 ++-
 .../doris/datasource/hudi/source/HudiScanNode.java |  19 +-
 .../datasource/iceberg/IcebergExternalTable.java   |   3 +-
 .../maxcompute/MaxComputeExternalTable.java        |   2 +-
 .../{MvccTable.java => EmptyMvccSnapshot.java}     |  14 +-
 .../apache/doris/datasource/mvcc/MvccTable.java    |   5 +-
 .../datasource/paimon/PaimonExternalTable.java     |   3 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |   3 +-
 .../org/apache/doris/nereids/NereidsPlanner.java   |   1 -
 .../org/apache/doris/nereids/StatementContext.java |   6 +-
 .../doris/nereids/rules/analysis/BindRelation.java |   3 +
 .../doris/tablefunction/MetadataGenerator.java     |   3 +-
 .../doris/datasource/hudi/HudiUtilsTest.java       |   4 +-
 .../apache/doris/external/hms/HmsCatalogTest.java  |   5 +-
 .../org/apache/doris/qe/HmsQueryCacheTest.java     |  12 +-
 .../hudi/hudi_mtmv/test_hudi_mtmv.out              | Bin 0 -> 8110 bytes
 .../hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out | Bin 0 -> 5610 bytes
 .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out      | Bin 0 -> 250 bytes
 .../hudi/hudi_mtmv/test_hudi_mtmv.groovy           | 256 +++++++++++++++++++++
 .../hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy   | 108 +++++++++
 .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy   |  91 ++++++++
 33 files changed, 1111 insertions(+), 173 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 9ad78816278..99226bc1ccf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -34,6 +34,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
 import org.apache.doris.datasource.metacache.MetaCache;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.paimon.PaimonMetadataCache;
 import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
 import org.apache.doris.fs.FileSystemCache;
@@ -277,7 +278,7 @@ public class ExternalMetaCacheMgr {
         if (metaCache != null) {
             List<Type> partitionColumnTypes;
             try {
-                partitionColumnTypes = table.getPartitionColumnTypes();
+                partitionColumnTypes = 
table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
             } catch (NotSupportedException e) {
                 LOG.warn("Ignore not supported hms table, message: {} ", 
e.getMessage());
                 return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 30bf48c3d8b..60a1f172978 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -391,7 +391,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
         throw new NotImplementedException("getChunkSized not implemented");
     }
 
-    protected Optional<SchemaCacheValue> getSchemaCacheValue() {
+    public Optional<SchemaCacheValue> getSchemaCacheValue() {
         ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
         return cache.getSchemaValue(dbName, name);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
index c7f2ce6f712..e928d3c739e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
@@ -51,6 +51,7 @@ public class TablePartitionValues {
     private long nextPartitionId;
     private final Map<Long, PartitionItem> idToPartitionItem;
     private final Map<String, Long> partitionNameToIdMap;
+    private Map<String, Long> partitionNameToLastModifiedMap;
     private final Map<Long, String> partitionIdToNameMap;
 
     private Map<Long, List<UniqueId>> idToUniqueIdsMap;
@@ -68,15 +69,12 @@ public class TablePartitionValues {
         nextPartitionId = 0;
         idToPartitionItem = new HashMap<>();
         partitionNameToIdMap = new HashMap<>();
+        partitionNameToLastModifiedMap = new HashMap<>();
         partitionIdToNameMap = new HashMap<>();
     }
 
-    public TablePartitionValues(List<String> partitionNames, 
List<List<String>> partitionValues, List<Type> types) {
-        this();
-        addPartitions(partitionNames, partitionValues, types);
-    }
-
-    public void addPartitions(List<String> partitionNames, List<List<String>> 
partitionValues, List<Type> types) {
+    public void addPartitions(List<String> partitionNames, List<List<String>> 
partitionValues, List<Type> types,
+            List<Long> partitionLastUpdateTimestamp) {
         Preconditions.checkState(partitionNames.size() == 
partitionValues.size());
         List<String> addPartitionNames = new ArrayList<>();
         List<PartitionItem> addPartitionItems = new ArrayList<>();
@@ -90,6 +88,7 @@ public class TablePartitionValues {
                 addPartitionNames.add(partitionNames.get(i));
                 
addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types));
             }
+            partitionNameToLastModifiedMap.put(partitionNames.get(i), 
partitionLastUpdateTimestamp.get(i));
         }
         cleanPartitions();
 
@@ -123,23 +122,6 @@ public class TablePartitionValues {
         partitionValuesMap = 
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
     }
 
-    public void dropPartitions(List<String> partitionNames, List<Type> types) {
-        partitionNames.forEach(p -> {
-            Long removedPartition = partitionNameToIdMap.get(p);
-            if (removedPartition != null) {
-                idToPartitionItem.remove(removedPartition);
-            }
-        });
-        List<String> remainingPartitionNames = new ArrayList<>();
-        List<PartitionItem> remainingPartitionItems = new ArrayList<>();
-        partitionNameToIdMap.forEach((partitionName, partitionId) -> {
-            remainingPartitionNames.add(partitionName);
-            remainingPartitionItems.add(idToPartitionItem.get(partitionId));
-        });
-        cleanPartitions();
-        addPartitionItems(remainingPartitionNames, remainingPartitionItems, 
types);
-    }
-
     public long getLastUpdateTimestamp() {
         return lastUpdateTimestamp;
     }
@@ -148,6 +130,10 @@ public class TablePartitionValues {
         this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
 
+    public Map<String, Long> getPartitionNameToLastModifiedMap() {
+        return partitionNameToLastModifiedMap;
+    }
+
     public Lock readLock() {
         return readWriteLock.readLock();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
new file mode 100644
index 00000000000..1710646ce3d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVBaseTableIf;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This abstract class represents a Hive Metastore (HMS) Dla Table and 
provides a blueprint for
+ * various operations related to metastore tables in Doris.
+ *
+ * Purpose:
+ * - To encapsulate common functionalities that HMS Dla tables should have for 
implementing other interfaces
+ *
+ * Why needed:
+ * - To provide a unified way to manage and interact with different kinds of 
Dla Table
+ * - To facilitate the implementation of multi-table materialized views (MTMV) 
by providing necessary
+ *   methods for snapshot and partition management.
+ * - To abstract out the specific details of HMS table operations, making the 
code more modular and maintainable.
+ */
+public abstract class HMSDlaTable implements MTMVBaseTableIf {
+    protected HMSExternalTable hmsTable;
+
+    public HMSDlaTable(HMSExternalTable table) {
+        this.hmsTable = table;
+    }
+
+    abstract Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
+            throws AnalysisException;
+
+    abstract PartitionType getPartitionType(Optional<MvccSnapshot> snapshot);
+
+    abstract Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot);
+
+    abstract List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot);
+
+    abstract MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+            Optional<MvccSnapshot> snapshot) throws AnalysisException;
+
+    abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException;
+
+    abstract boolean isPartitionColumnAllowNull();
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+                .refreshTable(hmsTable.getCatalog().getName(), 
hmsTable.getDbName(), hmsTable.getName(), true);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index dfd7d6fe4b8..492fe76da56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -31,21 +31,25 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.fs.FileSystemDirectoryLister;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
-import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.mtmv.MTMVSnapshotIf;
-import org.apache.doris.mtmv.MTMVTimestampSnapshot;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.qe.GlobalVariable;
@@ -105,7 +109,7 @@ import java.util.stream.Collectors;
 /**
  * Hive metastore external table.
  */
-public class HMSExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf {
+public class HMSExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalTable.class);
 
     public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
@@ -167,6 +171,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
 
     private DLAType dlaType = DLAType.UNKNOWN;
 
+    private HMSDlaTable dlaTable;
+
     // record the event update time when enable hms event listener
     protected volatile long eventUpdateTime;
 
@@ -204,10 +210,13 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
             } else {
                 if (supportedIcebergTable()) {
                     dlaType = DLAType.ICEBERG;
+                    dlaTable = new HiveDlaTable(this);
                 } else if (supportedHoodieTable()) {
                     dlaType = DLAType.HUDI;
+                    dlaTable = new HudiDlaTable(this);
                 } else if (supportedHiveTable()) {
                     dlaType = DLAType.HIVE;
+                    dlaTable = new HiveDlaTable(this);
                 } else {
                     // Should not reach here. Because `supportedHiveTable` 
will throw exception if not return true.
                     throw new NotSupportedException("Unsupported dlaType for 
table: " + getNameWithFullQualifiers());
@@ -298,23 +307,45 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return remoteTable;
     }
 
-    public List<Type> getPartitionColumnTypes() {
+    @Override
+    public List<Column> getFullSchema() {
         makeSureInitialized();
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        if (getDlaType() == DLAType.HUDI) {
+            return ((HudiDlaTable) 
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
+                    .getSchema();
+        }
+        Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(dbName, name);
+        return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
+    }
+
+    public List<Type> getPartitionColumnTypes(Optional<MvccSnapshot> snapshot) 
{
+        makeSureInitialized();
+        if (getDlaType() == DLAType.HUDI) {
+            return ((HudiDlaTable) 
dlaTable).getHudiSchemaCacheValue(snapshot).getPartitionColTypes();
+        }
         Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
         return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) 
value).getPartitionColTypes())
                 .orElse(Collections.emptyList());
     }
 
-    public List<Column> getPartitionColumns() {
+    public List<Type> getHudiPartitionColumnTypes(long timestamp) {
         makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) 
value).getPartitionColumns())
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+                new HudiSchemaCacheKey(dbName, name, timestamp));
+        return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) 
value).getPartitionColTypes())
                 .orElse(Collections.emptyList());
     }
 
+    public List<Column> getPartitionColumns() {
+        return getPartitionColumns(MvccUtil.getSnapshotFromContext(this));
+    }
+
     @Override
     public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
-        return getPartitionColumns();
+        makeSureInitialized();
+        return dlaTable.getPartitionColumns(snapshot);
     }
 
     @Override
@@ -354,7 +385,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         }
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) this.getCatalog());
-        List<Type> partitionColumnTypes = this.getPartitionColumnTypes();
+        List<Type> partitionColumnTypes = 
this.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this));
         HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
                 this.getDbName(), this.getName(), partitionColumnTypes);
         Map<Long, PartitionItem> idToPartitionItem = 
hivePartitionValues.getIdToPartitionItem();
@@ -563,10 +594,6 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
 
     @Override
     public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey 
key) {
-        return initSchemaAndUpdateTime();
-    }
-
-    public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
         org.apache.hadoop.hive.metastore.api.Table table = 
((HMSExternalCatalog) catalog).getClient()
                 .getTable(dbName, name);
         // try to use transient_lastDdlTime from hms client
@@ -575,7 +602,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 ? 
Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 
1000
                 // use current timestamp if lastDdlTime does not exist (hive 
views don't have this prop)
                 : System.currentTimeMillis();
-        return initSchema();
+        return initSchema(key);
     }
 
     public long getLastDdlTime() {
@@ -588,12 +615,12 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
-    public Optional<SchemaCacheValue> initSchema() {
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
         makeSureInitialized();
         if (dlaType.equals(DLAType.ICEBERG)) {
             return getIcebergSchema();
         } else if (dlaType.equals(DLAType.HUDI)) {
-            return getHudiSchema();
+            return getHudiSchema(key);
         } else {
             return getHiveSchema();
         }
@@ -605,11 +632,12 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
     }
 
-    private Optional<SchemaCacheValue> getHudiSchema() {
+    private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) {
         boolean[] enableSchemaEvolution = {false};
-        InternalSchema hudiInternalSchema = 
HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution);
+        HudiSchemaCacheKey hudiSchemaCacheKey = (HudiSchemaCacheKey) key;
+        InternalSchema hudiInternalSchema = 
HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution,
+                Long.toString(hudiSchemaCacheKey.getTimestamp()));
         org.apache.avro.Schema hudiSchema = 
AvroInternalSchemaConverter.convert(hudiInternalSchema, name);
-
         List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
         List<String> colTypes = Lists.newArrayList();
         for (int i = 0; i < hudiSchema.getFields().size(); i++) {
@@ -857,96 +885,46 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
 
     @Override
     public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
-        return getPartitionType();
+        makeSureInitialized();
+        return dlaTable.getPartitionType(snapshot);
     }
 
-    public PartitionType getPartitionType() {
-        return getPartitionColumns().size() > 0 ? PartitionType.LIST : 
PartitionType.UNPARTITIONED;
+    public Set<String> getPartitionColumnNames() {
+        return getPartitionColumnNames(MvccUtil.getSnapshotFromContext(this));
     }
 
     @Override
     public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
-        return getPartitionColumnNames();
-    }
-
-    public Set<String> getPartitionColumnNames() {
-        return getPartitionColumns().stream()
-                .map(c -> 
c.getName().toLowerCase()).collect(Collectors.toSet());
+        makeSureInitialized();
+        return dlaTable.getPartitionColumnNames(snapshot);
     }
 
     @Override
-    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return getNameToPartitionItems();
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        makeSureInitialized();
+        return dlaTable.getAndCopyPartitionItems(snapshot);
     }
 
     @Override
     public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
             Optional<MvccSnapshot> snapshot) throws AnalysisException {
-        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
-                .getMetaStoreCache((HMSExternalCatalog) getCatalog());
-        HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
-                getDbName(), getName(), getPartitionColumnTypes());
-        Long partitionId = 
getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
-        HivePartition hivePartition = 
getHivePartitionByIdOrAnalysisException(partitionId,
-                hivePartitionValues, cache);
-        return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
+        makeSureInitialized();
+        return dlaTable.getPartitionSnapshot(partitionName, context, snapshot);
     }
 
     @Override
     public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
-        if (getPartitionType() == PartitionType.UNPARTITIONED) {
-            return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
-        }
-        HivePartition maxPartition = null;
-        long maxVersionTime = 0L;
-        long visibleVersionTime;
-        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
-                .getMetaStoreCache((HMSExternalCatalog) getCatalog());
-        HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
-                getDbName(), getName(), getPartitionColumnTypes());
-        List<HivePartition> partitionList = 
cache.getAllPartitionsWithCache(getDbName(), getName(),
-                
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
-        if (CollectionUtils.isEmpty(partitionList)) {
-            throw new AnalysisException("partitionList is empty, table name: " 
+ getName());
-        }
-        for (HivePartition hivePartition : partitionList) {
-            visibleVersionTime = hivePartition.getLastModifiedTime();
-            if (visibleVersionTime > maxVersionTime) {
-                maxVersionTime = visibleVersionTime;
-                maxPartition = hivePartition;
-            }
-        }
-        return new 
MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(getPartitionColumns()), 
maxVersionTime);
-    }
-
-    private Long getPartitionIdByNameOrAnalysisException(String partitionName,
-            HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
-            throws AnalysisException {
-        Long partitionId = 
hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
-        if (partitionId == null) {
-            throw new AnalysisException("can not find partition: " + 
partitionName);
-        }
-        return partitionId;
+        makeSureInitialized();
+        return dlaTable.getTableSnapshot(context, snapshot);
     }
 
-    private HivePartition getHivePartitionByIdOrAnalysisException(Long 
partitionId,
-            HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
-            HiveMetaStoreCache cache) throws AnalysisException {
-        List<String> partitionValues = 
hivePartitionValues.getPartitionValuesMap().get(partitionId);
-        if (CollectionUtils.isEmpty(partitionValues)) {
-            throw new AnalysisException("can not find partitionValues: " + 
partitionId);
-        }
-        HivePartition partition = cache.getHivePartition(getDbName(), 
getName(), partitionValues);
-        if (partition == null) {
-            throw new AnalysisException("can not find partition: " + 
partitionId);
-        }
-        return partition;
-    }
 
     @Override
     public boolean isPartitionColumnAllowNull() {
-        return true;
+        makeSureInitialized();
+        return dlaTable.isPartitionColumnAllowNull();
     }
 
     /**
@@ -1008,7 +986,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         }
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) catalog);
-        List<Type> partitionColumnTypes = getPartitionColumnTypes();
+        List<Type> partitionColumnTypes = 
getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this));
         HiveMetaStoreCache.HivePartitionValues partitionValues = null;
         // Get table partitions from cache.
         if (!partitionColumnTypes.isEmpty()) {
@@ -1082,6 +1060,16 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
 
     @Override
     public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        makeSureInitialized();
+        dlaTable.beforeMTMVRefresh(mtmv);
+    }
+
+    @Override
+    public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
+        if (getDlaType() == DLAType.HUDI) {
+            return new 
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+        }
+        return new EmptyMvccSnapshot();
     }
 
     public HoodieTableMetaClient getHudiClient() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java
new file mode 100644
index 00000000000..296b2f3667a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HiveDlaTable extends HMSDlaTable {
+
+    public HiveDlaTable(HMSExternalTable table) {
+        super(table);
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
+        return getPartitionColumns(snapshot).stream()
+                .map(c -> 
c.getName().toLowerCase()).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        Optional<SchemaCacheValue> schemaCacheValue = 
hmsTable.getSchemaCacheValue();
+        return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) 
value).getPartitionColumns())
+                .orElse(Collections.emptyList());
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return hmsTable.getNameToPartitionItems();
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+            Optional<MvccSnapshot> snapshot) throws AnalysisException {
+        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+        HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
+                hmsTable.getDbName(), hmsTable.getName(), 
hmsTable.getPartitionColumnTypes(snapshot));
+        Long partitionId = 
getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
+        HivePartition hivePartition = 
getHivePartitionByIdOrAnalysisException(partitionId,
+                hivePartitionValues, cache);
+        return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        if (hmsTable.getPartitionType(snapshot) == 
PartitionType.UNPARTITIONED) {
+            return new MTMVMaxTimestampSnapshot(hmsTable.getName(), 
hmsTable.getLastDdlTime());
+        }
+        HivePartition maxPartition = null;
+        long maxVersionTime = 0L;
+        long visibleVersionTime;
+        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+        HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
+                hmsTable.getDbName(), hmsTable.getName(), 
hmsTable.getPartitionColumnTypes(snapshot));
+        List<HivePartition> partitionList = 
cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(),
+                
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
+        if (CollectionUtils.isEmpty(partitionList)) {
+            throw new AnalysisException("partitionList is empty, table name: " 
+ hmsTable.getName());
+        }
+        for (HivePartition hivePartition : partitionList) {
+            visibleVersionTime = hivePartition.getLastModifiedTime();
+            if (visibleVersionTime > maxVersionTime) {
+                maxVersionTime = visibleVersionTime;
+                maxPartition = hivePartition;
+            }
+        }
+        return new MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(
+                hmsTable.getPartitionColumns()), maxVersionTime);
+    }
+
+    private Long getPartitionIdByNameOrAnalysisException(String partitionName,
+            HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
+            throws AnalysisException {
+        Long partitionId = 
hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
+        if (partitionId == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return partitionId;
+    }
+
+    private HivePartition getHivePartitionByIdOrAnalysisException(Long 
partitionId,
+            HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
+            HiveMetaStoreCache cache) throws AnalysisException {
+        List<String> partitionValues = 
hivePartitionValues.getPartitionValuesMap().get(partitionId);
+        if (CollectionUtils.isEmpty(partitionValues)) {
+            throw new AnalysisException("can not find partitionValues: " + 
partitionId);
+        }
+        HivePartition partition = cache.getHivePartition(hmsTable.getDbName(), 
hmsTable.getName(), partitionValues);
+        if (partition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionId);
+        }
+        return partition;
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 9bb09225607..07f133992d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -802,7 +802,8 @@ public class HiveMetaStoreClientHelper {
         return output.toString();
     }
 
-    public static InternalSchema getHudiTableSchema(HMSExternalTable table, 
boolean[] enableSchemaEvolution) {
+    public static InternalSchema getHudiTableSchema(HMSExternalTable table, 
boolean[] enableSchemaEvolution,
+            String timestamp) {
         HoodieTableMetaClient metaClient = table.getHudiClient();
         TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
 
@@ -814,7 +815,7 @@ public class HiveMetaStoreClientHelper {
         // So, we should reload timeline so that we can read the latest commit 
files.
         metaClient.reloadActiveTimeline();
 
-        Option<InternalSchema> internalSchemaOption = 
schemaUtil.getTableInternalSchemaFromCommitMetadata();
+        Option<InternalSchema> internalSchemaOption = 
schemaUtil.getTableInternalSchemaFromCommitMetadata(timestamp);
 
         if (internalSchemaOption.isPresent()) {
             enableSchemaEvolution[0] = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
new file mode 100644
index 00000000000..952d8e3311e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
+import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HudiDlaTable extends HMSDlaTable {
+
+    public HudiDlaTable(HMSExternalTable table) {
+        super(table);
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
+        return getPartitionColumns(snapshot).stream()
+                .map(c -> 
c.getName().toLowerCase()).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getHudiSchemaCacheValue(snapshot).getPartitionColumns();
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        TablePartitionValues tablePartitionValues = 
getOrFetchHudiSnapshotCacheValue(snapshot);
+        Map<Long, PartitionItem> idToPartitionItem = 
tablePartitionValues.getIdToPartitionItem();
+        Map<Long, String> partitionIdToNameMap = 
tablePartitionValues.getPartitionIdToNameMap();
+        Map<String, PartitionItem> copiedPartitionItems = Maps.newHashMap();
+        for (Long key : partitionIdToNameMap.keySet()) {
+            copiedPartitionItems.put(partitionIdToNameMap.get(key), 
idToPartitionItem.get(key));
+        }
+        return copiedPartitionItems;
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+            Optional<MvccSnapshot> snapshot) throws AnalysisException {
+        Map<String, Long> partitionNameToLastModifiedMap = 
getOrFetchHudiSnapshotCacheValue(
+                snapshot).getPartitionNameToLastModifiedMap();
+        return new 
MTMVTimestampSnapshot(partitionNameToLastModifiedMap.get(partitionName));
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new 
MTMVTimestampSnapshot(getOrFetchHudiSnapshotCacheValue(snapshot).getLastUpdateTimestamp());
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot> 
snapshot) {
+        TablePartitionValues snapshotCacheValue = 
getOrFetchHudiSnapshotCacheValue(snapshot);
+        return 
getHudiSchemaCacheValue(snapshotCacheValue.getLastUpdateTimestamp());
+    }
+
+    private TablePartitionValues 
getOrFetchHudiSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+        if (snapshot.isPresent()) {
+            return ((HudiMvccSnapshot) 
snapshot.get()).getTablePartitionValues();
+        } else {
+            return HudiUtils.getPartitionValues(Optional.empty(), hmsTable);
+        }
+    }
+
+    private HMSSchemaCacheValue getHudiSchemaCacheValue(long timestamp) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog());
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+                new HudiSchemaCacheKey(hmsTable.getDbName(), 
hmsTable.getName(), timestamp));
+        if (!schemaCacheValue.isPresent()) {
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                    null, hmsTable.getCatalog().getName(), 
hmsTable.getDbName(), hmsTable.getName(), timestamp);
+        }
+        return (HMSSchemaCacheValue) schemaCacheValue.get();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 890f6147f33..b90c390406d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveProperties;
 import org.apache.doris.datasource.hive.HiveTransaction;
 import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.fs.DirectoryLister;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
@@ -133,7 +134,7 @@ public class HiveScanNode extends FileQueryScanNode {
         List<HivePartition> resPartitions = Lists.newArrayList();
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
-        List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
+        List<Type> partitionColumnTypes = 
hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
         if (!partitionColumnTypes.isEmpty()) {
             // partitioned table
             Collection<PartitionItem> partitionItems;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
new file mode 100644
index 00000000000..0f01291e54c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+
+/**
+ * Implementation of MvccSnapshot for Hudi tables that maintains partition 
values
+ * for MVCC (Multiversion Concurrency Control) operations.
+ * This class is immutable to ensure thread safety.
+ */
+public class HudiMvccSnapshot implements MvccSnapshot {
+    private final TablePartitionValues tablePartitionValues;
+
+    /**
+     * Creates a new HudiMvccSnapshot with the specified partition values.
+     *
+     * @param tablePartitionValues The partition values for the snapshot
+     * @throws IllegalArgumentException if tablePartitionValues is null
+     */
+    public HudiMvccSnapshot(TablePartitionValues tablePartitionValues) {
+        if (tablePartitionValues == null) {
+            throw new IllegalArgumentException("TablePartitionValues cannot be 
null");
+        }
+        this.tablePartitionValues = tablePartitionValues;
+    }
+
+    /**
+     * Gets the table partition values associated with this snapshot.
+     *
+     * @return The immutable TablePartitionValues object
+     */
+    public TablePartitionValues getTablePartitionValues() {
+        return tablePartitionValues;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HudiMvccSnapshot that = (HudiMvccSnapshot) o;
+        return tablePartitionValues.equals(that.tablePartitionValues);
+    }
+
+    @Override
+    public int hashCode() {
+        return tablePartitionValues.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("HudiMvccSnapshot{tablePartitionValues=%s}", 
tablePartitionValues);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
new file mode 100644
index 00000000000..5a5b0dc044e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+/**
+ * Cache key for Hudi table schemas that includes timestamp information.
+ * This allows for time-travel queries and ensures proper schema versioning.
+ */
+public class HudiSchemaCacheKey extends SchemaCacheKey {
+    private final long timestamp;
+
+    /**
+     * Creates a new cache key for Hudi table schemas.
+     *
+     * @param dbName The database name
+     * @param tableName The table name
+     * @param timestamp The timestamp for schema version
+     * @throws IllegalArgumentException if dbName or tableName is null or empty
+     */
+    public HudiSchemaCacheKey(String dbName, String tableName, long timestamp) 
{
+        super(dbName, tableName);
+        if (timestamp < 0) {
+            throw new IllegalArgumentException("Timestamp cannot be negative");
+        }
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Gets the timestamp associated with this schema version.
+     *
+     * @return the timestamp value
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        HudiSchemaCacheKey that = (HudiSchemaCacheKey) o;
+        return timestamp == that.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.hashCode(), timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("HudiSchemaCacheKey{dbName='%s', tableName='%s', 
timestamp=%d}",
+                getDbName(), getTblName(), timestamp);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index 894103afe2c..c0e2a783ac3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.TablePartitionValues;
 import org.apache.doris.datasource.hive.HMSExternalTable;
@@ -257,10 +258,6 @@ public class HudiUtils {
     public static TablePartitionValues 
getPartitionValues(Optional<TableSnapshot> tableSnapshot,
             HMSExternalTable hmsTable) {
         TablePartitionValues partitionValues = new TablePartitionValues();
-        if (hmsTable.getPartitionColumns().isEmpty()) {
-            //isn't partition table.
-            return partitionValues;
-        }
 
         HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
         HudiCachedPartitionProcessor processor = 
(HudiCachedPartitionProcessor) Env.getCurrentEnv()
@@ -312,10 +309,11 @@ public class HudiUtils {
         return schemaInfo;
     }
 
-    public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable 
hmsTable) {
-        ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
-                .getSchemaCache(hmsTable.getCatalog());
-        Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(hmsTable.getDbName(), hmsTable.getName());
+    public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable 
hmsTable, String queryInstant) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog());
+        SchemaCacheKey key = new HudiSchemaCacheKey(hmsTable.getDbName(), 
hmsTable.getName(),
+                Long.parseLong(queryInstant));
+        Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(key);
         return (HudiSchemaCacheValue) schemaCacheValue.get();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index 62094b21c2b..ef921cbfa47 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
@@ -85,14 +86,15 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
     public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable 
table,
             HoodieTableMetaClient tableMetaClient, String timestamp, boolean 
useHiveSyncPartition) {
         Preconditions.checkState(catalogId == table.getCatalog().getId());
+        TablePartitionValues partitionValues = new TablePartitionValues();
         Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
-        if (!partitionColumns.isPresent()) {
-            return null;
+        if (!partitionColumns.isPresent() || partitionColumns.get().length == 
0) {
+            return partitionValues;
         }
         HoodieTimeline timeline = 
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
         Option<HoodieInstant> lastInstant = timeline.lastInstant();
         if (!lastInstant.isPresent()) {
-            return null;
+            return partitionValues;
         }
         long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
         if (Long.parseLong(timestamp) == lastTimestamp) {
@@ -100,10 +102,13 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
         }
         List<String> partitionNameAndValues = 
getPartitionNamesBeforeOrEquals(timeline, timestamp);
         List<String> partitionNames = Arrays.asList(partitionColumns.get());
-        TablePartitionValues partitionValues = new TablePartitionValues();
+        // we don't support auto refresh hudi mtmv currently,
+        // so the list `partitionLastUpdateTimestamp` is full of 0L.
         partitionValues.addPartitions(partitionNameAndValues,
                 partitionNameAndValues.stream().map(p -> 
parsePartitionValues(partitionNames, p))
-                        .collect(Collectors.toList()), 
table.getPartitionColumnTypes());
+                        .collect(Collectors.toList()), 
table.getHudiPartitionColumnTypes(Long.parseLong(timestamp)),
+                Collections.nCopies(partitionNameAndValues.size(), 0L));
+        partitionValues.setLastUpdateTimestamp(Long.parseLong(timestamp));
         return partitionValues;
     }
 
@@ -111,19 +116,21 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
                                                    boolean 
useHiveSyncPartition)
             throws CacheException {
         Preconditions.checkState(catalogId == table.getCatalog().getId());
+        TablePartitionValues partitionValues = new TablePartitionValues();
         Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
-        if (!partitionColumns.isPresent()) {
-            return null;
+        if (!partitionColumns.isPresent() || partitionColumns.get().length == 
0) {
+            return partitionValues;
         }
         HoodieTimeline timeline = 
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
         Option<HoodieInstant> lastInstant = timeline.lastInstant();
         if (!lastInstant.isPresent()) {
-            return null;
+            return partitionValues;
         }
         try {
             long lastTimestamp = 
Long.parseLong(lastInstant.get().getTimestamp());
-            TablePartitionValues partitionValues = partitionCache.get(
-                    new TablePartitionKey(table.getDbName(), table.getName(), 
table.getPartitionColumnTypes()));
+            partitionValues = partitionCache.get(
+                    new TablePartitionKey(table.getDbName(), table.getName(),
+                            table.getHudiPartitionColumnTypes(lastTimestamp)));
             partitionValues.readLock().lock();
             try {
                 long lastUpdateTimestamp = 
partitionValues.getLastUpdateTimestamp();
@@ -159,7 +166,8 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
                 partitionValues.cleanPartitions();
                 partitionValues.addPartitions(partitionNames,
                         partitionNames.stream().map(p -> 
parsePartitionValues(partitionColumnsList, p))
-                                .collect(Collectors.toList()), 
table.getPartitionColumnTypes());
+                                .collect(Collectors.toList()), 
table.getHudiPartitionColumnTypes(lastTimestamp),
+                        Collections.nCopies(partitionNames.size(), 0L));
                 partitionValues.setLastUpdateTimestamp(lastTimestamp);
                 return partitionValues;
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 17543ec3949..00697a5469f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -29,14 +29,13 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
@@ -166,11 +165,6 @@ public class HudiScanNode extends HiveScanNode {
         basePath = hmsTable.getRemoteTable().getSd().getLocation();
         inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
         serdeLib = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
-        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog());
-        Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(table.getDbName(), table.getName());
-        HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue) 
schemaCacheValue.get();
-        columnNames = 
hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
-        columnTypes = hudiSchemaCacheValue.getColTypes();
 
         if (scanParams != null && !scanParams.incrementalRead()) {
             // Only support incremental read
@@ -211,11 +205,16 @@ public class HudiScanNode extends HiveScanNode {
             }
             queryInstant = snapshotInstant.get().getTimestamp();
         }
+
+        HudiSchemaCacheValue hudiSchemaCacheValue = 
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
+        columnNames = 
hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
+        columnTypes = hudiSchemaCacheValue.getColTypes();
+
         fsView = Env.getCurrentEnv()
             .getExtMetaCacheMgr()
             .getFsViewProcessor(hmsTable.getCatalog())
             .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient);
-        if (HudiUtils.getSchemaCacheValue(hmsTable).isEnableSchemaEvolution()) 
{
+        if (hudiSchemaCacheValue.isEnableSchemaEvolution()) {
             params.setHistorySchemaInfo(new ConcurrentHashMap<>());
         }
     }
@@ -270,7 +269,7 @@ public class HudiScanNode extends HiveScanNode {
             // fileDesc.setNestedFields(hudiSplit.getNestedFields());
             fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
         } else {
-            HudiSchemaCacheValue hudiSchemaCacheValue = 
HudiUtils.getSchemaCacheValue(hmsTable);
+            HudiSchemaCacheValue hudiSchemaCacheValue = 
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
             if (hudiSchemaCacheValue.isEnableSchemaEvolution()) {
                 long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(
                         new File(hudiSplit.getPath().get()).getName()));
@@ -291,7 +290,7 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient 
metaClient) {
-        List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
+        List<Type> partitionColumnTypes = 
hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
         if (!partitionColumnTypes.isEmpty()) {
             this.totalPartitionNum = selectedPartitions.totalPartitionNum;
             Map<String, PartitionItem> prunedPartitions = 
selectedPartitions.selectedPartitions;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index fefd6b76b0f..16964ead1d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
@@ -278,7 +279,7 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     }
 
     @Override
-    public MvccSnapshot loadSnapshot() {
+    public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
         return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 1136fb079f3..a598c68703f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -134,7 +134,7 @@ public class MaxComputeExternalTable extends ExternalTable {
                 partitionSpecs.stream()
                         .map(p -> parsePartitionValues(partitionColumnNames, 
p))
                         .collect(Collectors.toList()),
-                partitionTypes);
+                partitionTypes, Collections.nCopies(partitionSpecs.size(), 
0L));
         return partitionValues;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
similarity index 67%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
index d69e0f3114d..35f63291a25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
@@ -17,17 +17,5 @@
 
 package org.apache.doris.datasource.mvcc;
 
-import org.apache.doris.catalog.TableIf;
-
-/**
- * The table that needs to query data based on the version needs to implement 
this interface.
- */
-public interface MvccTable extends TableIf {
-    /**
-     * Retrieve the current snapshot information of the table,
-     * and the returned result will be used for the entire process of this 
query
-     *
-     * @return MvccSnapshot
-     */
-    MvccSnapshot loadSnapshot();
+public class EmptyMvccSnapshot implements MvccSnapshot {
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
index d69e0f3114d..89b1d6e9b07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
@@ -17,8 +17,11 @@
 
 package org.apache.doris.datasource.mvcc;
 
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.TableIf;
 
+import java.util.Optional;
+
 /**
  * The table that needs to query data based on the version needs to implement 
this interface.
  */
@@ -29,5 +32,5 @@ public interface MvccTable extends TableIf {
      *
      * @return MvccSnapshot
      */
-    MvccSnapshot loadSnapshot();
+    MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 5d3e7dc3f5b..8a97aa856a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.paimon;
 
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
@@ -212,7 +213,7 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     }
 
     @Override
-    public MvccSnapshot loadSnapshot() {
+    public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
         return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 016d0a1d495..105ef0d9d69 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -80,6 +80,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
@@ -381,7 +382,7 @@ public class MTMVTask extends AbstractTask {
             }
             if (tableIf instanceof MvccTable) {
                 MvccTable mvccTable = (MvccTable) tableIf;
-                MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot();
+                MvccSnapshot mvccSnapshot = 
mvccTable.loadSnapshot(Optional.empty());
                 snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 56adf5f2f82..258bab38d48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -224,7 +224,6 @@ public class NereidsPlanner extends Planner {
             // collect table and lock them in the order of table id
             collectAndLockTable(showAnalyzeProcess(explainLevel, 
showPlanProcess));
             // after table collector, we should use a new context.
-            statementContext.loadSnapshots();
             Plan resultPlan = planWithoutLock(plan, requireProperties, 
explainLevel, showPlanProcess);
             lockCallback.accept(resultPlan);
             if (statementContext.getConnectContext().getExecutor() != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 4f0e7c27efd..ab7c9f77fc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids;
 
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.View;
@@ -658,13 +659,12 @@ public class StatementContext implements Closeable {
     /**
      * Load snapshot information of mvcc
      */
-    public void loadSnapshots() {
+    public void loadSnapshots(Optional<TableSnapshot> tableSnapshot, 
MvccTableInfo mvccTableInfo) {
         for (TableIf tableIf : tables.values()) {
             if (tableIf instanceof MvccTable) {
-                MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
                 // may be set by MTMV, we can not load again
                 if (!snapshots.containsKey(mvccTableInfo)) {
-                    snapshots.put(mvccTableInfo, ((MvccTable) 
tableIf).loadSnapshot());
+                    snapshots.put(mvccTableInfo, ((MvccTable) 
tableIf).loadSnapshot(tableSnapshot));
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 82dddc9575a..0f161088535 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
+import org.apache.doris.datasource.mvcc.MvccTableInfo;
 import org.apache.doris.nereids.CTEContext;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.SqlCacheContext;
@@ -378,6 +379,8 @@ public class BindRelation extends OneAnalysisRuleFactory {
 
         List<String> qualifierWithoutTableName = Lists.newArrayList();
         qualifierWithoutTableName.addAll(qualifiedTableName.subList(0, 
qualifiedTableName.size() - 1));
+        MvccTableInfo mvccTableInfo = new MvccTableInfo(table);
+        
cascadesContext.getStatementContext().loadSnapshots(unboundRelation.getTableSnapshot(),
 mvccTableInfo);
         boolean isView = false;
         try {
             switch (table.getType()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 373e3d415bb..4e7cd6b5116 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -60,6 +60,7 @@ import 
org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
@@ -1610,7 +1611,7 @@ public class MetadataGenerator {
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) tbl.getCatalog());
         HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
-                tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes());
+                tbl.getDbName(), tbl.getName(), 
tbl.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(tbl)));
         Map<Long, List<String>> valuesMap = 
hivePartitionValues.getPartitionValuesMap();
         List<TRow> dataBatch = Lists.newArrayList();
         for (Map.Entry<Long, List<String>> entry : valuesMap.entrySet()) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
index 016b6616f0b..409fc1daf72 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
@@ -175,7 +175,7 @@ public class HudiUtilsTest {
         HMSExternalCatalog catalog = new HMSExternalCatalog();
         HMSExternalDatabase db = new HMSExternalDatabase(catalog, 1, "db", 
"db");
         HMSExternalTable hmsExternalTable = new HMSExternalTable(2, "tb", 
"tb", catalog, db);
-        HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new 
boolean[] {false});
+        HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new 
boolean[] {false}, "20241219214518880");
 
         // 4. delete the commit file,
         //    this operation is used to imitate the clean operation in hudi
@@ -189,7 +189,7 @@ public class HudiUtilsTest {
         //    because we will refresh timeline in this `getHudiTableSchema` 
method,
         //    and we can get the latest commit.
         //    so that this error: `Could not read commit details from file 
<table_path>/.hoodie/1.commit` will be not reported.
-        HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new 
boolean[] {false});
+        HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new 
boolean[] {false}, "20241219214518880");
 
         // 7. clean up
         Assert.assertTrue(commit2.delete());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
index 30a233dd1a9..4abed2ee708 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
@@ -29,11 +29,13 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.CatalogMgr;
+import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
+import org.apache.doris.datasource.hive.HiveDlaTable;
 import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
 import org.apache.doris.qe.SessionVariable;
 
@@ -107,6 +109,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
         Deencapsulation.setField(tbl, "catalog", hmsCatalog);
         Deencapsulation.setField(tbl, "dbName", "hms_db");
         Deencapsulation.setField(tbl, "name", "hms_tbl");
+        Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl));
         new Expectations(tbl) {
             {
                 tbl.getId();
@@ -138,7 +141,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
                 result = TableIf.TableType.HMS_EXTERNAL_TABLE;
 
                 // mock initSchemaAndUpdateTime and do nothing
-                tbl.initSchemaAndUpdateTime();
+                tbl.initSchemaAndUpdateTime(new 
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl"));
                 minTimes = 0;
 
                 tbl.getDatabase();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 0a981dab8a9..376f8cba4e8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -31,10 +31,12 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.CatalogMgr;
+import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
+import org.apache.doris.datasource.hive.HiveDlaTable;
 import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
 import org.apache.doris.planner.OlapScanNode;
@@ -123,6 +125,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         Deencapsulation.setField(tbl, "catalog", hmsCatalog);
         Deencapsulation.setField(tbl, "dbName", "hms_db");
         Deencapsulation.setField(tbl, "name", "hms_tbl");
+        Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl));
         new Expectations(tbl) {
             {
                 tbl.getId();
@@ -158,7 +161,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
                 result = DLAType.HIVE;
 
                 // mock initSchemaAndUpdateTime and do nothing
-                tbl.initSchemaAndUpdateTime();
+                tbl.initSchemaAndUpdateTime(new 
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl"));
                 minTimes = 0;
 
                 tbl.getDatabase();
@@ -173,6 +176,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         Deencapsulation.setField(tbl2, "catalog", hmsCatalog);
         Deencapsulation.setField(tbl2, "dbName", "hms_db");
         Deencapsulation.setField(tbl2, "name", "hms_tbl2");
+        Deencapsulation.setField(tbl2, "dlaTable", new HiveDlaTable(tbl2));
         new Expectations(tbl2) {
             {
                 tbl2.getId();
@@ -208,7 +212,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
                 result = DLAType.HIVE;
 
                 // mock initSchemaAndUpdateTime and do nothing
-                tbl2.initSchemaAndUpdateTime();
+                tbl2.initSchemaAndUpdateTime(new 
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl2"));
                 minTimes = 0;
 
                 tbl2.getDatabase();
@@ -386,7 +390,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4);
 
         // invoke initSchemaAndUpdateTime first and init schemaUpdateTime
-        tbl2.initSchemaAndUpdateTime();
+        tbl2.initSchemaAndUpdateTime(new 
ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName()));
 
         CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, 
scanNodes);
         ca.checkCacheMode(System.currentTimeMillis() + 
Config.cache_last_version_interval_second * 1000L * 2);
@@ -434,7 +438,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4);
 
         // invoke initSchemaAndUpdateTime first and init schemaUpdateTime
-        tbl2.initSchemaAndUpdateTime();
+        tbl2.initSchemaAndUpdateTime(new 
ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName()));
 
         CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, 
scanNodes);
         ca.checkCacheModeForNereids(System.currentTimeMillis() + 
Config.cache_last_version_interval_second * 1000L * 2);
diff --git 
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out
new file mode 100644
index 00000000000..f8870d0b287
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out 
differ
diff --git 
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
new file mode 100644
index 00000000000..30b64a98ad8
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
 differ
diff --git 
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
new file mode 100644
index 00000000000..77597631587
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
 differ
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy
new file mode 100644
index 00000000000..5bfcea11d67
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_mtmv", 
"p2,external,hudi,external_remote,external_remote_hudi") {
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled hudi test")
+        return
+    }
+    String suiteName = "test_hudi_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_mv"
+    String dbName = context.config.getDbNameByFile(context.file)
+    String otherDbName = "${suiteName}_otherdb"
+    String tableName = "${suiteName}_table"
+
+    sql """drop database if exists ${otherDbName}"""
+    sql """create database ${otherDbName}"""
+     sql """
+        CREATE TABLE  ${otherDbName}.${tableName} (
+          `user_id` INT,
+          `num` INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`user_id`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+        PROPERTIES ('replication_num' = '1') ;
+       """
+
+    sql """
+        insert into ${otherDbName}.${tableName} values(1,2);
+        """
+
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+            ${props}
+        );"""
+
+    order_qt_base_table """ select * from 
${catalogName}.hudi_mtmv_regression_test.hudi_table_1; """
+
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`par`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+            SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+        """
+    def showPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+    assertTrue(showPartitionsResult.toString().contains("p_a"))
+    assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+    // refresh one partitions
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+    //refresh auto
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_auto "SELECT * FROM ${mvName} "
+    order_qt_is_sync_before_rebuild "select SyncWithBaseTables from 
mv_infos('database'='${dbName}') where Name='${mvName}'"
+
+   // rebuild catalog, should not Affects MTMV
+    sql """drop catalog if exists ${catalogName}"""
+    sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+            ${props}
+        );"""
+    order_qt_is_sync_after_rebuild "select SyncWithBaseTables from 
mv_infos('database'='${dbName}') where Name='${mvName}'"
+
+    // should refresh normal after catalog rebuild
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} complete
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} "
+
+    sql """drop materialized view if exists ${mvName};"""
+
+     // not have partition
+     sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            KEY(`id`)
+            COMMENT "comment1"
+            DISTRIBUTED BY HASH(`id`) BUCKETS 2
+            PROPERTIES ('replication_num' = '1',"grace_period"="333")
+            AS
+            SELECT id,age,par FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+        """
+    order_qt_not_partition_before "select SyncWithBaseTables from 
mv_infos('database'='${dbName}') where Name='${mvName}'"
+    //should can refresh auto
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_not_partition "SELECT * FROM ${mvName} "
+    order_qt_not_partition_after "select SyncWithBaseTables from 
mv_infos('database'='${dbName}') where Name='${mvName}'"
+    sql """drop materialized view if exists ${mvName};"""
+
+    // refresh on schedule
+    //  sql """
+    //  CREATE MATERIALIZED VIEW ${mvName}
+    //     BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 SECOND STARTS 
"9999-12-13 21:07:09"
+    //     KEY(`id`)
+    //     COMMENT "comment1"
+    //     DISTRIBUTED BY HASH(`id`) BUCKETS 2
+    //     PROPERTIES ('replication_num' = '1',"grace_period"="333")
+    //     AS
+    //     SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+    // """
+    //  waitingMTMVTaskFinishedByMvName(mvName)
+    //  sql """drop materialized view if exists ${mvName};"""
+
+    // refresh on schedule
+     sql """
+     CREATE MATERIALIZED VIEW ${mvName}
+        BUILD IMMEDIATE REFRESH AUTO ON commit
+        KEY(`id`)
+        COMMENT "comment1"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES ('replication_num' = '1',"grace_period"="333")
+        AS
+        SELECT id,age,par FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+    """
+    waitingMTMVTaskFinishedByMvName(mvName)
+     sql """drop materialized view if exists ${mvName};"""
+
+    // cross db and join internal table
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`par`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+            SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join 
internal.${otherDbName}.${tableName} b on a.id=b.user_id;
+        """
+    def showJoinPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showJoinPartitionsResult: " + 
showJoinPartitionsResult.toString())
+    assertTrue(showJoinPartitionsResult.toString().contains("p_a"))
+    assertTrue(showJoinPartitionsResult.toString().contains("p_b"))
+
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_join_one_partition "SELECT * FROM ${mvName} "
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`create_date`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+            SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+        """
+    def showTwoPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showTwoPartitionsResult: " + 
showTwoPartitionsResult.toString())
+    assertTrue(showTwoPartitionsResult.toString().contains("p_20200101"))
+    assertTrue(showTwoPartitionsResult.toString().contains("p_20380101"))
+    assertTrue(showTwoPartitionsResult.toString().contains("p_20380102"))
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto;
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_two_partition "SELECT * FROM ${mvName} "
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`create_date`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = 
'1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d',
+                        'partition_sync_time_unit'='MONTH')
+            AS
+            SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+        """
+    def showLimitPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showLimitPartitionsResult: " + 
showLimitPartitionsResult.toString())
+    assertFalse(showLimitPartitionsResult.toString().contains("p_20200101"))
+    assertTrue(showLimitPartitionsResult.toString().contains("p_20380101"))
+    assertTrue(showLimitPartitionsResult.toString().contains("p_20380102"))
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto;
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_limit_partition "SELECT * FROM ${mvName} "
+    sql """drop materialized view if exists ${mvName};"""
+
+    // not allow date trunc
+    test {
+         sql """
+            CREATE MATERIALIZED VIEW ${mvName}
+                BUILD DEFERRED REFRESH AUTO ON MANUAL
+                partition by (date_trunc(`create_date`,'month'))
+                DISTRIBUTED BY RANDOM BUCKETS 2
+                PROPERTIES ('replication_num' = 
'1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d',
+                            'partition_sync_time_unit'='MONTH')
+                AS
+                SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+            """
+          exception "only support"
+      }
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`region`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+            SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_null_partition;
+        """
+    def showNullPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showNullPartitionsResult: " + 
showNullPartitionsResult.toString())
+    // assertTrue(showNullPartitionsResult.toString().contains("p_null"))
+    assertTrue(showNullPartitionsResult.toString().contains("p_NULL"))
+    assertTrue(showNullPartitionsResult.toString().contains("p_bj"))
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto;
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    // Will lose null data
+    order_qt_null_partition "SELECT * FROM ${mvName} "
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """drop catalog if exists ${catalogName}"""
+
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..a0ac9b0783f
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_olap_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remote_hudi") {
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled hudi test")
+        return
+    }
+    String suiteName = "test_hudi_olap_rewrite_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_mv"
+    String dbName = context.config.getDbNameByFile(context.file)
+    String tableName = "${suiteName}_table"
+    sql """drop table if exists ${tableName}"""
+    sql """
+        CREATE TABLE ${tableName} (
+          `user_id` INT,
+          `num` INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`user_id`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+        PROPERTIES ('replication_num' = '1') ;
+       """
+    sql """
+        insert into ${tableName} values(1,2);
+        """
+
+    sql """analyze table internal.`${dbName}`. ${tableName} with sync"""
+    sql """alter table internal.`${dbName}`. ${tableName} modify column 
user_id set stats ('row_count'='1');"""
+
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+    sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+    String mvSql = "SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join 
${tableName} b on a.id=b.user_id;";
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+            ${props}
+        );"""
+
+    sql """analyze table 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync"""
+    sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 
modify column par set stats ('row_count'='10');"""
+
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`par`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+             ${mvSql}
+        """
+    def showPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+    assertTrue(showPartitionsResult.toString().contains("p_a"))
+    assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+    // refresh one partitions
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+    def explainOnePartition = sql """ explain  ${mvSql} """
+    logger.info("explainOnePartition: " + explainOnePartition.toString())
+    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    // select p_b should not rewrite
+    mv_rewrite_fail("SELECT * FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join 
${tableName} b on a.id=b.user_id where a.par='b';", "${mvName}")
+
+    //refresh auto
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_auto "SELECT * FROM ${mvName} "
+
+    def explainAllPartition = sql """ explain  ${mvSql}; """
+    logger.info("explainAllPartition: " + explainAllPartition.toString())
+    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    sql """drop materialized view if exists ${mvName};"""
+    sql """drop catalog if exists ${catalogName}"""
+}
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..95c71c48475
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remote_hudi") {
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled hudi test")
+        return
+    }
+    String suiteName = "test_hudi_rewrite_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_mv"
+    String dbName = context.config.getDbNameByFile(context.file)
+
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+    sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+    String mvSql = "SELECT par,count(*) as num FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 group by par;";
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+            ${props}
+        );"""
+
+    sql """analyze table 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync"""
+    sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 
modify column par set stats ('row_count'='10');"""
+
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`par`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+             ${mvSql}
+        """
+    def showPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+    assertTrue(showPartitionsResult.toString().contains("p_a"))
+    assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+    // refresh one partitions
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+    def explainOnePartition = sql """ explain  ${mvSql} """
+    logger.info("explainOnePartition: " + explainOnePartition.toString())
+    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    // select p_b should not rewrite
+    mv_rewrite_fail("SELECT par,count(*) as num FROM 
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 where par='b' group by 
par;", "${mvName}")
+
+    //refresh auto
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_auto "SELECT * FROM ${mvName} "
+
+    def explainAllPartition = sql """ explain  ${mvSql}; """
+    logger.info("explainAllPartition: " + explainAllPartition.toString())
+    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    sql """drop materialized view if exists ${mvName};"""
+    sql """drop catalog if exists ${catalogName}"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to