This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5275202c176 [enhance](mtmv) mtmv disable hive auto refresh (#30775)
5275202c176 is described below

commit 5275202c1760e6587dde5ff27e0faf24123388f9
Author: zhangdong <493738...@qq.com>
AuthorDate: Mon Feb 5 11:40:10 2024 +0800

    [enhance](mtmv) mtmv disable hive auto refresh (#30775)
    
    - If the `related table` is `hive`, do not refresh automatically
    - If the `related table` is `hive`, the partition col is allowed to be 
`null`. Otherwise, it must be `not null`
    - add more `ut`
---
 .../java/org/apache/doris/catalog/OlapTable.java   |   9 +
 .../doris/catalog/external/HMSExternalTable.java   |  10 +
 .../doris/common/proc/PartitionsProcDir.java       |   4 +-
 .../apache/doris/common/proc/TablesProcDir.java    |   3 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  36 +-
 .../mtmv/{MTMVUtil.java => MTMVPartitionUtil.java} | 105 +----
 .../org/apache/doris/mtmv/MTMVRelatedTableIf.java  |  15 +
 .../org/apache/doris/mtmv/MTMVRelationManager.java |   3 +-
 .../org/apache/doris/mtmv/MTMVRewriteUtil.java     |  87 ++++
 .../java/org/apache/doris/mtmv/MTMVService.java    |   4 +-
 .../main/java/org/apache/doris/mtmv/MTMVUtil.java  | 452 +--------------------
 .../mv/AbstractMaterializedViewRule.java           |   6 +-
 .../exploration/mv/MaterializedViewUtils.java      |   2 +-
 .../trees/plans/commands/info/RefreshMTMVInfo.java |   4 +-
 .../doris/tablefunction/MetadataGenerator.java     |   4 +-
 .../apache/doris/mtmv/MTMVPartitionUtilTest.java   | 187 +++++++++
 .../apache/doris/mtmv/MTMVRefreshSnapshotTest.java |  96 +++++
 .../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 254 ++++++++++++
 .../java/org/apache/doris/mtmv/MTMVTaskTest.java   | 163 ++++++++
 .../exploration/mv/MaterializedViewUtilsTest.java  |   2 -
 regression-test/data/mtmv_p0/test_hive_mtmv.out    |   5 +
 .../suites/mtmv_p0/test_hive_mtmv.groovy           |   8 +
 22 files changed, 892 insertions(+), 567 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 2e01f37639b..0c0ce320f11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2632,4 +2632,13 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         return getPartitionOrAnalysisException(partitionId).getName();
     }
 
+    @Override
+    public boolean needAutoRefresh() {
+        return true;
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index aa4258baadb..c31ba11a0fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -885,6 +885,16 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 ((ListPartitionItem) 
item).getItems().get(0).getPartitionValuesAsStringListForHive());
         return partitionValuesList;
     }
+
+    @Override
+    public boolean needAutoRefresh() {
+        return false;
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 01feaf23683..4703429fa18 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -45,7 +45,7 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.OrderByPair;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -309,7 +309,7 @@ public class PartitionsProcDir implements ProcDirInterface {
                 partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
                 if (olapTable instanceof MTMV) {
                     try {
-                        List<String> partitionUnSyncTables = MTMVUtil
+                        List<String> partitionUnSyncTables = MTMVPartitionUtil
                                 .getPartitionUnSyncTables((MTMV) olapTable, 
partitionId);
                         
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
                         partitionInfo.add(partitionUnSyncTables.toString());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
index b3ce9be35c9..c2926c829ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.ListComparator;
@@ -91,7 +90,7 @@ public class TablesProcDir implements ProcDirInterface {
             String partitionKey = FeConstants.null_string;
             table.readLock();
             try {
-                if (table.getType() == TableType.OLAP) {
+                if (table instanceof OlapTable) {
                     OlapTable olapTable = (OlapTable) table;
                     if (olapTable.getPartitionInfo().getType() == 
PartitionType.RANGE) {
                         partitionNum = olapTable.getPartitions().size();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 194172a6732..6a861200e69 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -18,17 +18,14 @@
 package org.apache.doris.job.extensions.mtmv;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
@@ -36,6 +33,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
 import org.apache.doris.mtmv.MTMVPlanUtil;
 import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
 import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
@@ -145,6 +143,13 @@ public class MTMVTask extends AbstractTask {
         this.taskContext = Objects.requireNonNull(taskContext);
     }
 
+    // only for test
+    public MTMVTask(MTMV mtmv, MTMVRelation relation, MTMVTaskContext 
taskContext) {
+        this.mtmv = mtmv;
+        this.relation = relation;
+        this.taskContext = taskContext;
+    }
+
     @Override
     public void run() throws JobException {
         LOG.info("mtmv task run, taskId: {}", super.getTaskId());
@@ -161,10 +166,10 @@ public class MTMVTask extends AbstractTask {
             // To be completely consistent with hive, you need to manually 
refresh the cache
             // refreshHmsTable();
             if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
-                MTMVUtil.alignMvPartition(mtmv, 
mtmv.getMvPartitionInfo().getRelatedTable());
+                MTMVPartitionUtil.alignMvPartition(mtmv, 
mtmv.getMvPartitionInfo().getRelatedTable());
             }
             List<Long> needRefreshPartitionIds = 
calculateNeedRefreshPartitions();
-            this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
needRefreshPartitionIds);
+            this.needRefreshPartitions = 
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
             this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
             if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
                 return;
@@ -181,8 +186,8 @@ public class MTMVTask extends AbstractTask {
                 Set<Long> execPartitionIds = 
Sets.newHashSet(needRefreshPartitionIds
                         .subList(start, end > needRefreshPartitionIds.size() ? 
needRefreshPartitionIds.size() : end));
                 // need get names before exec
-                List<String> execPartitionNames = 
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
-                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots = MTMVUtil
+                List<String> execPartitionNames = 
MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots = MTMVPartitionUtil
                         .generatePartitionSnapshots(mtmv, 
relation.getBaseTables(), execPartitionIds);
                 exec(ctx, execPartitionIds, tableWithPartKey);
                 completedPartitions.addAll(execPartitionNames);
@@ -241,7 +246,7 @@ public class MTMVTask extends AbstractTask {
         LOG.info("mtmv task before, taskId: {}", super.getTaskId());
         super.before();
         try {
-            mtmv = getMTMV();
+            mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
         } catch (UserException e) {
             LOG.warn("before task failed:", e);
             throw new JobException(e);
@@ -267,11 +272,6 @@ public class MTMVTask extends AbstractTask {
         }
     }
 
-    private MTMV getMTMV() throws DdlException, MetaNotFoundException {
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
-        return (MTMV) db.getTableOrMetaException(mtmvId, 
TableType.MATERIALIZED_VIEW);
-    }
-
     @Override
     public void runTask() throws JobException {
         LOG.info("mtmv task runTask, taskId: {}", super.getTaskId());
@@ -296,7 +296,7 @@ public class MTMVTask extends AbstractTask {
         String dbName = "";
         String mvName = "";
         try {
-            MTMV mtmv = getMTMV();
+            MTMV mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
             dbName = mtmv.getQualifiedDbName();
             mvName = mtmv.getName();
         } catch (UserException e) {
@@ -386,20 +386,20 @@ public class MTMVTask extends AbstractTask {
         }
     }
 
-    private List<Long> calculateNeedRefreshPartitions() throws 
AnalysisException {
+    public List<Long> calculateNeedRefreshPartitions() throws 
AnalysisException {
         // check whether the user manually triggers it
         if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
             if (taskContext.isComplete()) {
                 return mtmv.getPartitionIds();
             } else if (!CollectionUtils
                     .isEmpty(taskContext.getPartitions())) {
-                return MTMVUtil.getPartitionsIdsByNames(mtmv, 
taskContext.getPartitions());
+                return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, 
taskContext.getPartitions());
             }
         }
         // check if data is fresh
         // We need to use a newly generated relationship and cannot retrieve 
it using mtmv.getRelation()
         // to avoid rebuilding the baseTable and causing a change in the 
tableId
-        boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), 
mtmv.getExcludedTriggerTables());
+        boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, 
relation.getBaseTables(), mtmv.getExcludedTriggerTables());
         if (fresh) {
             return Lists.newArrayList();
         }
@@ -413,7 +413,7 @@ public class MTMVTask extends AbstractTask {
         }
         // We need to use a newly generated relationship and cannot retrieve 
it using mtmv.getRelation()
         // to avoid rebuilding the baseTable and causing a change in the 
tableId
-        return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv, 
relation.getBaseTables());
+        return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, 
relation.getBaseTables());
     }
 
     public MTMVTaskContext getTaskContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
similarity index 83%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
copy to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 74593e5def5..a8e3e11869d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -29,11 +29,7 @@ import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -48,23 +44,8 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 
-public class MTMVUtil {
-    private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
-
-    /**
-     * get Table by BaseTableInfo
-     *
-     * @param baseTableInfo
-     * @return
-     * @throws AnalysisException
-     */
-    public static TableIf getTable(BaseTableInfo baseTableInfo) throws 
AnalysisException {
-        TableIf table = Env.getCurrentEnv().getCatalogMgr()
-                .getCatalogOrAnalysisException(baseTableInfo.getCtlId())
-                .getDbOrAnalysisException(baseTableInfo.getDbId())
-                .getTableOrAnalysisException(baseTableInfo.getTableId());
-        return table;
-    }
+public class MTMVPartitionUtil {
+    private static final Logger LOG = 
LogManager.getLogger(MTMVPartitionUtil.class);
 
     /**
      * Determine whether the partition is sync with retated partition and 
other baseTables
@@ -76,7 +57,7 @@ public class MTMVUtil {
      * @return
      * @throws AnalysisException
      */
-    private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, 
Set<BaseTableInfo> tables,
+    public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, 
Set<BaseTableInfo> tables,
             Set<String> excludedTriggerTables) throws AnalysisException {
         boolean isSyncWithPartition = true;
         if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
@@ -192,11 +173,14 @@ public class MTMVUtil {
     public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long 
partitionId) throws AnalysisException {
         List<String> res = Lists.newArrayList();
         for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) 
{
-            TableIf table = getTable(baseTableInfo);
+            TableIf table = MTMVUtil.getTable(baseTableInfo);
             if (!(table instanceof MTMVRelatedTableIf)) {
                 continue;
             }
             MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
+            if (!mtmvRelatedTableIf.needAutoRefresh()) {
+                continue;
+            }
             if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
                     
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
                 PartitionItem item = 
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
@@ -220,55 +204,6 @@ public class MTMVUtil {
         return res;
     }
 
-    /**
-     * Determine which partition of mtmv can be rewritten
-     *
-     * @param mtmv
-     * @param ctx
-     * @return
-     */
-    public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, 
ConnectContext ctx) {
-        List<Partition> res = Lists.newArrayList();
-        Collection<Partition> allPartitions = mtmv.getPartitions();
-        // check session variable if enable rewrite
-        if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
-            return res;
-        }
-        if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
-                .isMaterializedViewRewriteEnableContainExternalTable()) {
-            return res;
-        }
-
-        MTMVRelation mtmvRelation = mtmv.getRelation();
-        if (mtmvRelation == null) {
-            return res;
-        }
-        // check mv is normal
-        if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
-                && mtmv.getStatus().getRefreshState() == 
MTMVRefreshState.SUCCESS)) {
-            return res;
-        }
-        // check gracePeriod
-        long gracePeriodMills = mtmv.getGracePeriod();
-        long currentTimeMills = System.currentTimeMillis();
-        for (Partition partition : allPartitions) {
-            if (gracePeriodMills > 0 && currentTimeMills <= 
(partition.getVisibleVersionTime()
-                    + gracePeriodMills)) {
-                res.add(partition);
-                continue;
-            }
-            try {
-                if (isMTMVPartitionSync(mtmv, partition.getId(), 
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
-                    res.add(partition);
-                }
-            } catch (AnalysisException e) {
-                // ignore it
-                LOG.warn("check isMTMVPartitionSync failed", e);
-            }
-        }
-        return res;
-    }
-
     /**
      * Get the partitions that need to be refreshed
      *
@@ -303,9 +238,12 @@ public class MTMVUtil {
      * @return
      * @throws AnalysisException
      */
-    private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
+    public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
             MTMVRelatedTableIf relatedTable,
             Long relatedPartitionId) throws AnalysisException {
+        if (!relatedTable.needAutoRefresh()) {
+            return true;
+        }
         MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
                 .getPartitionSnapshot(relatedPartitionId);
         String relatedPartitionName = 
relatedTable.getPartitionName(relatedPartitionId);
@@ -320,7 +258,7 @@ public class MTMVUtil {
      * @param desc
      * @return
      */
-    private static String generatePartitionName(PartitionKeyDesc desc) {
+    public static String generatePartitionName(PartitionKeyDesc desc) {
         String partitionName = "p_";
         partitionName += 
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
                 .replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
@@ -401,7 +339,7 @@ public class MTMVUtil {
         for (BaseTableInfo baseTableInfo : tables) {
             TableIf table = null;
             try {
-                table = getTable(baseTableInfo);
+                table = MTMVUtil.getTable(baseTableInfo);
             } catch (AnalysisException e) {
                 LOG.warn("get table failed, {}", baseTableInfo, e);
                 return false;
@@ -421,7 +359,7 @@ public class MTMVUtil {
             throws AnalysisException {
         TableIf table = null;
         try {
-            table = getTable(baseTableInfo);
+            table = MTMVUtil.getTable(baseTableInfo);
         } catch (AnalysisException e) {
             LOG.warn("get table failed, {}", baseTableInfo, e);
             return false;
@@ -433,22 +371,15 @@ public class MTMVUtil {
             return true;
         }
         MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
+        if (!baseTable.needAutoRefresh()) {
+            return true;
+        }
         MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
         String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
         return mtmv.getRefreshSnapshot()
                 .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), 
baseTableCurrentSnapshot);
     }
 
-    private static boolean mtmvContainsExternalTable(MTMV mtmv) {
-        Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
-        for (BaseTableInfo baseTableInfo : baseTables) {
-            if (baseTableInfo.getCtlId() != 
InternalCatalog.INTERNAL_CATALOG_ID) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     public static Map<String, MTMVRefreshPartitionSnapshot> 
generatePartitionSnapshots(MTMV mtmv,
             Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
             throws AnalysisException {
@@ -482,7 +413,7 @@ public class MTMVUtil {
                     
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
                 continue;
             }
-            TableIf table = getTable(baseTableInfo);
+            TableIf table = MTMVUtil.getTable(baseTableInfo);
             if (!(table instanceof MTMVRelatedTableIf)) {
                 continue;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index 51773db0df1..46454679b56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -87,4 +87,19 @@ public interface MTMVRelatedTableIf extends TableIf {
      * @throws AnalysisException
      */
     String getPartitionName(long partitionId) throws AnalysisException;
+
+    /**
+     * Does the current type of table allow timed triggering
+     *
+     * @return If return false,The method of comparing whether to synchronize 
will directly return true,
+     * otherwise the snapshot information will be compared
+     */
+    boolean needAutoRefresh();
+
+    /**
+     * if allow partition column `isAllowNull`
+     *
+     * @return
+     */
+    boolean isPartitionColumnAllowNull();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 77414e4fc88..aa7ffd2426d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -71,7 +71,8 @@ public class MTMVRelationManager implements MTMVHookService {
         for (BaseTableInfo tableInfo : mvInfos) {
             try {
                 MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
-                if 
(!CollectionUtils.isEmpty(MTMVUtil.getMTMVCanRewritePartitions(mtmv, ctx))) {
+                if (!CollectionUtils
+                        
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, 
System.currentTimeMillis()))) {
                     res.add(mtmv);
                 }
             } catch (AnalysisException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
new file mode 100644
index 00000000000..666a79eba97
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+
+public class MTMVRewriteUtil {
+    private static final Logger LOG = 
LogManager.getLogger(MTMVRewriteUtil.class);
+
+    /**
+     * Determine which partition of mtmv can be rewritten
+     *
+     * @param mtmv
+     * @param ctx
+     * @return
+     */
+    public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, 
ConnectContext ctx,
+            long currentTimeMills) {
+        List<Partition> res = Lists.newArrayList();
+        Collection<Partition> allPartitions = mtmv.getPartitions();
+        // check session variable if enable rewrite
+        if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
+            return res;
+        }
+        if (MTMVUtil.mtmvContainsExternalTable(mtmv) && 
!ctx.getSessionVariable()
+                .isMaterializedViewRewriteEnableContainExternalTable()) {
+            return res;
+        }
+
+        MTMVRelation mtmvRelation = mtmv.getRelation();
+        if (mtmvRelation == null) {
+            return res;
+        }
+        // check mv is normal
+        if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
+                && mtmv.getStatus().getRefreshState() == 
MTMVRefreshState.SUCCESS)) {
+            return res;
+        }
+        // check gracePeriod
+        long gracePeriodMills = mtmv.getGracePeriod();
+        for (Partition partition : allPartitions) {
+            if (gracePeriodMills > 0 && currentTimeMills <= 
(partition.getVisibleVersionTime()
+                    + gracePeriodMills)) {
+                res.add(partition);
+                continue;
+            }
+            try {
+                if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, 
partition.getId(), mtmvRelation.getBaseTables(),
+                        Sets.newHashSet())) {
+                    res.add(partition);
+                }
+            } catch (AnalysisException e) {
+                // ignore it
+                LOG.warn("check isMTMVPartitionSync failed", e);
+            }
+        }
+        return res;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index 6abb22f3e5f..227166e56d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -84,10 +84,10 @@ public class MTMVService {
 
     public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
         Objects.requireNonNull(mtmv);
+        LOG.info("createMTMV: " + mtmv.getName());
         if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
-            MTMVUtil.alignMvPartition(mtmv, 
mtmv.getMvPartitionInfo().getRelatedTable());
+            MTMVPartitionUtil.alignMvPartition(mtmv, 
mtmv.getMvPartitionInfo().getRelatedTable());
         }
-        LOG.info("createMTMV: " + mtmv.getName());
         for (MTMVHookService mtmvHookService : hooks.values()) {
             mtmvHookService.createMTMV(mtmv);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
index 74593e5def5..3b97e35141d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -17,39 +17,19 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.analysis.AddPartitionClause;
-import org.apache.doris.analysis.DropPartitionClause;
-import org.apache.doris.analysis.PartitionKeyDesc;
-import org.apache.doris.analysis.SinglePartitionDesc;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
 
 public class MTMVUtil {
-    private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
 
     /**
      * get Table by BaseTableInfo
@@ -66,380 +46,18 @@ public class MTMVUtil {
         return table;
     }
 
-    /**
-     * Determine whether the partition is sync with retated partition and 
other baseTables
-     *
-     * @param mtmv
-     * @param partitionId
-     * @param tables
-     * @param excludedTriggerTables
-     * @return
-     * @throws AnalysisException
-     */
-    private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, 
Set<BaseTableInfo> tables,
-            Set<String> excludedTriggerTables) throws AnalysisException {
-        boolean isSyncWithPartition = true;
-        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
-            MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
-            // if follow base table, not need compare with related table, only 
should compare with related partition
-            excludedTriggerTables.add(relatedTable.getName());
-            PartitionItem item = 
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
-            Map<Long, PartitionItem> relatedPartitionItems = 
relatedTable.getPartitionItems();
-            long relatedPartitionId = getExistPartitionId(item,
-                    relatedPartitionItems);
-            if (relatedPartitionId == -1L) {
-                LOG.warn("can not found related partition: " + partitionId);
-                return false;
-            }
-            isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, 
relatedTable, relatedPartitionId);
-        }
-        return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, 
partitionId, tables, excludedTriggerTables);
-
-    }
-
-    /**
-     * Align the partitions of mtmv and related tables, delete more and add 
less
-     *
-     * @param mtmv
-     * @param relatedTable
-     * @throws DdlException
-     * @throws AnalysisException
-     */
-    public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf 
relatedTable)
-            throws DdlException, AnalysisException {
-        Map<Long, PartitionItem> relatedTableItems = 
relatedTable.getPartitionItems();
-        Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
-        // drop partition of mtmv
-        for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
-            long partitionId = getExistPartitionId(entry.getValue(), 
relatedTableItems);
-            if (partitionId == -1L) {
-                dropPartition(mtmv, entry.getKey());
-            }
-        }
-        // add partition for mtmv
-        for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
-            long partitionId = getExistPartitionId(entry.getValue(), 
mtmvItems);
-            if (partitionId == -1L) {
-                addPartition(mtmv, entry.getValue());
-            }
-        }
-    }
-
-    public static List<String> getPartitionNamesByIds(MTMV mtmv, 
Collection<Long> ids) throws AnalysisException {
-        List<String> res = Lists.newArrayList();
-        for (Long partitionId : ids) {
-            
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
-        }
-        return res;
-    }
-
-    public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> 
partitions) throws AnalysisException {
-        List<Long> res = Lists.newArrayList();
-        for (String partitionName : partitions) {
-            Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionName);
-            res.add(partition.getId());
-        }
-        return res;
-    }
-
-    /**
-     * check if table is sync with all baseTables
-     *
-     * @param mtmv
-     * @return
-     */
-    public static boolean isMTMVSync(MTMV mtmv) {
-        MTMVRelation mtmvRelation = mtmv.getRelation();
-        if (mtmvRelation == null) {
-            return false;
-        }
-        try {
-            return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), 
Sets.newHashSet());
-        } catch (AnalysisException e) {
-            LOG.warn("isMTMVSync failed: ", e);
-            return false;
-        }
-    }
-
-    /**
-     * Determine whether the mtmv is sync with tables
-     *
-     * @param mtmv
-     * @param tables
-     * @param excludeTables
-     * @return
-     * @throws AnalysisException
-     */
-    public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, 
Set<String> excludeTables)
-            throws AnalysisException {
-        Collection<Partition> partitions = mtmv.getPartitions();
-        for (Partition partition : partitions) {
-            if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, 
excludeTables)) {
-                return false;
-            }
-        }
-        return true;
+    public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, 
MetaNotFoundException {
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+        return (MTMV) db.getTableOrMetaException(mtmvId, 
TableType.MATERIALIZED_VIEW);
     }
 
     /**
-     * get not sync tables
+     *  if base tables of mtmv contains external table
      *
      * @param mtmv
-     * @param partitionId
      * @return
-     * @throws AnalysisException
      */
-    public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long 
partitionId) throws AnalysisException {
-        List<String> res = Lists.newArrayList();
-        for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) 
{
-            TableIf table = getTable(baseTableInfo);
-            if (!(table instanceof MTMVRelatedTableIf)) {
-                continue;
-            }
-            MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
-            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
-                    
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
-                PartitionItem item = 
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
-                Map<Long, PartitionItem> relatedPartitionItems = 
mtmvRelatedTableIf.getPartitionItems();
-                long relatedPartitionId = getExistPartitionId(item,
-                        relatedPartitionItems);
-                if (relatedPartitionId == -1L) {
-                    throw new AnalysisException("can not found related 
partition");
-                }
-                boolean isSyncWithPartition = isSyncWithPartition(mtmv, 
partitionId, mtmvRelatedTableIf,
-                        relatedPartitionId);
-                if (!isSyncWithPartition) {
-                    res.add(mtmvRelatedTableIf.getName());
-                }
-            } else {
-                if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
-                    res.add(table.getName());
-                }
-            }
-        }
-        return res;
-    }
-
-    /**
-     * Determine which partition of mtmv can be rewritten
-     *
-     * @param mtmv
-     * @param ctx
-     * @return
-     */
-    public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, 
ConnectContext ctx) {
-        List<Partition> res = Lists.newArrayList();
-        Collection<Partition> allPartitions = mtmv.getPartitions();
-        // check session variable if enable rewrite
-        if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
-            return res;
-        }
-        if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
-                .isMaterializedViewRewriteEnableContainExternalTable()) {
-            return res;
-        }
-
-        MTMVRelation mtmvRelation = mtmv.getRelation();
-        if (mtmvRelation == null) {
-            return res;
-        }
-        // check mv is normal
-        if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
-                && mtmv.getStatus().getRefreshState() == 
MTMVRefreshState.SUCCESS)) {
-            return res;
-        }
-        // check gracePeriod
-        long gracePeriodMills = mtmv.getGracePeriod();
-        long currentTimeMills = System.currentTimeMillis();
-        for (Partition partition : allPartitions) {
-            if (gracePeriodMills > 0 && currentTimeMills <= 
(partition.getVisibleVersionTime()
-                    + gracePeriodMills)) {
-                res.add(partition);
-                continue;
-            }
-            try {
-                if (isMTMVPartitionSync(mtmv, partition.getId(), 
mtmvRelation.getBaseTables(), Sets.newHashSet())) {
-                    res.add(partition);
-                }
-            } catch (AnalysisException e) {
-                // ignore it
-                LOG.warn("check isMTMVPartitionSync failed", e);
-            }
-        }
-        return res;
-    }
-
-    /**
-     * Get the partitions that need to be refreshed
-     *
-     * @param mtmv
-     * @param baseTables
-     * @return
-     */
-    public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, 
Set<BaseTableInfo> baseTables) {
-        Collection<Partition> allPartitions = mtmv.getPartitions();
-        List<Long> res = Lists.newArrayList();
-        for (Partition partition : allPartitions) {
-            try {
-                if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
-                        mtmv.getExcludedTriggerTables())) {
-                    res.add(partition.getId());
-                }
-            } catch (AnalysisException e) {
-                res.add(partition.getId());
-                LOG.warn("check isMTMVPartitionSync failed", e);
-            }
-        }
-        return res;
-    }
-
-    /**
-     * compare last update time of mtmvPartition and tablePartition
-     *
-     * @param mtmv
-     * @param mtmvPartitionId
-     * @param relatedTable
-     * @param relatedPartitionId
-     * @return
-     * @throws AnalysisException
-     */
-    private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
-            MTMVRelatedTableIf relatedTable,
-            Long relatedPartitionId) throws AnalysisException {
-        MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
-                .getPartitionSnapshot(relatedPartitionId);
-        String relatedPartitionName = 
relatedTable.getPartitionName(relatedPartitionId);
-        String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
-        return mtmv.getRefreshSnapshot()
-                .equalsWithRelatedPartition(mtmvPartitionName, 
relatedPartitionName, relatedPartitionCurrentSnapshot);
-    }
-
-    /**
-     * like p_00000101_20170201
-     *
-     * @param desc
-     * @return
-     */
-    private static String generatePartitionName(PartitionKeyDesc desc) {
-        String partitionName = "p_";
-        partitionName += 
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
-                .replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
-        if (partitionName.length() > 50) {
-            partitionName = partitionName.substring(0, 30) + 
Math.abs(Objects.hash(partitionName))
-                    + "_" + System.currentTimeMillis();
-        }
-        return partitionName;
-    }
-
-    /**
-     * drop partition of mtmv
-     *
-     * @param mtmv
-     * @param partitionId
-     */
-    private static void dropPartition(MTMV mtmv, Long partitionId) throws 
AnalysisException, DdlException {
-        if (!mtmv.writeLockIfExist()) {
-            return;
-        }
-        try {
-            Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionId);
-            DropPartitionClause dropPartitionClause = new 
DropPartitionClause(false, partition.getName(), false, false);
-            Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), 
mtmv, dropPartitionClause);
-        } finally {
-            mtmv.writeUnlock();
-        }
-
-    }
-
-    /**
-     * add partition for mtmv like relatedPartitionId of relatedTable
-     *
-     * @param mtmv
-     * @param partitionItem
-     * @throws DdlException
-     */
-    private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
-            throws DdlException {
-        PartitionKeyDesc oldPartitionKeyDesc = 
partitionItem.toPartitionKeyDesc();
-        Map<String, String> partitionProperties = Maps.newHashMap();
-        SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
-                generatePartitionName(oldPartitionKeyDesc),
-                oldPartitionKeyDesc, partitionProperties);
-
-        AddPartitionClause addPartitionClause = new 
AddPartitionClause(singlePartitionDesc,
-                mtmv.getDefaultDistributionInfo().toDistributionDesc(), 
partitionProperties, false);
-        Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), 
mtmv.getName(), addPartitionClause);
-    }
-
-    /**
-     * compare PartitionItem and return equals partitionId
-     * if not found, return -1L
-     *
-     * @param target
-     * @param sources
-     * @return
-     */
-    private static long getExistPartitionId(PartitionItem target, Map<Long, 
PartitionItem> sources) {
-        for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
-            if (target.equals(entry.getValue())) {
-                return entry.getKey();
-            }
-        }
-        return -1L;
-    }
-
-    /**
-     * Determine is sync, ignoring excludedTriggerTables and non OlapTanle
-     *
-     * @param mtmvPartitionId
-     * @param tables
-     * @param excludedTriggerTables
-     * @return
-     */
-    private static boolean isSyncWithAllBaseTables(MTMV mtmv, long 
mtmvPartitionId, Set<BaseTableInfo> tables,
-            Set<String> excludedTriggerTables) throws AnalysisException {
-        for (BaseTableInfo baseTableInfo : tables) {
-            TableIf table = null;
-            try {
-                table = getTable(baseTableInfo);
-            } catch (AnalysisException e) {
-                LOG.warn("get table failed, {}", baseTableInfo, e);
-                return false;
-            }
-            if (excludedTriggerTables.contains(table.getName())) {
-                continue;
-            }
-            boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, 
mtmvPartitionId, baseTableInfo);
-            if (!syncWithBaseTable) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private static boolean isSyncWithBaseTable(MTMV mtmv, long 
mtmvPartitionId, BaseTableInfo baseTableInfo)
-            throws AnalysisException {
-        TableIf table = null;
-        try {
-            table = getTable(baseTableInfo);
-        } catch (AnalysisException e) {
-            LOG.warn("get table failed, {}", baseTableInfo, e);
-            return false;
-        }
-
-        if (!(table instanceof MTMVRelatedTableIf)) {
-            // if not MTMVRelatedTableIf, we can not get snapshot from it,
-            // Currently, it is believed to be synchronous
-            return true;
-        }
-        MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
-        MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
-        String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
-        return mtmv.getRefreshSnapshot()
-                .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), 
baseTableCurrentSnapshot);
-    }
-
-    private static boolean mtmvContainsExternalTable(MTMV mtmv) {
+    public static boolean mtmvContainsExternalTable(MTMV mtmv) {
         Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
         for (BaseTableInfo baseTableInfo : baseTables) {
             if (baseTableInfo.getCtlId() != 
InternalCatalog.INTERNAL_CATALOG_ID) {
@@ -448,60 +66,4 @@ public class MTMVUtil {
         }
         return false;
     }
-
-    public static Map<String, MTMVRefreshPartitionSnapshot> 
generatePartitionSnapshots(MTMV mtmv,
-            Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
-            throws AnalysisException {
-        Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
-        for (Long partitionId : partitionIds) {
-            res.put(mtmv.getPartition(partitionId).getName(), 
generatePartitionSnapshot(mtmv, baseTables, partitionId));
-        }
-        return res;
-    }
-
-
-    private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV 
mtmv,
-            Set<BaseTableInfo> baseTables, Long partitionId)
-            throws AnalysisException {
-        MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new 
MTMVRefreshPartitionSnapshot();
-        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
-            MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
-            List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
-                    mtmv.getPartitionItems().get(partitionId),
-                    relatedTable);
-
-            for (Long relatedPartitionId : relatedPartitionIds) {
-                MTMVSnapshotIf partitionSnapshot = relatedTable
-                        .getPartitionSnapshot(relatedPartitionId);
-                refreshPartitionSnapshot.getPartitions()
-                        
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
-            }
-        }
-        for (BaseTableInfo baseTableInfo : baseTables) {
-            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
-                    
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
-                continue;
-            }
-            TableIf table = getTable(baseTableInfo);
-            if (!(table instanceof MTMVRelatedTableIf)) {
-                continue;
-            }
-            refreshPartitionSnapshot.getTables().put(table.getId(), 
((MTMVRelatedTableIf) table).getTableSnapshot());
-        }
-        return refreshPartitionSnapshot;
-    }
-
-    private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem 
mtmvPartitionItem,
-            MTMVRelatedTableIf relatedTable) {
-        List<Long> res = Lists.newArrayList();
-        Map<Long, PartitionItem> relatedPartitionItems = 
relatedTable.getPartitionItems();
-        for (Entry<Long, PartitionItem> entry : 
relatedPartitionItems.entrySet()) {
-            if (mtmvPartitionItem.equals(entry.getValue())) {
-                res.add(entry.getKey());
-                // current, the partitioning of MTMV corresponds one-to-one 
with the partitioning of related table
-                return res;
-            }
-        }
-        return res;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index bac4059c162..12f409a5946 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -25,7 +25,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVRewriteUtil;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.jobs.executor.Rewriter;
 import org.apache.doris.nereids.memo.GroupExpression;
@@ -315,8 +315,8 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
             return ImmutableSet.of();
         }
         // get mv valid partitions
-        Set<Long> mvDataValidPartitionIdSet = 
MTMVUtil.getMTMVCanRewritePartitions(mtmv,
-                        cascadesContext.getConnectContext()).stream()
+        Set<Long> mvDataValidPartitionIdSet = 
MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
+                cascadesContext.getConnectContext(), 
System.currentTimeMillis()).stream()
                 .map(Partition::getId)
                 .collect(Collectors.toSet());
         Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node 
-> node instanceof LogicalOlapScan
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
index e7b11b5fd17..baf72cc278e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
@@ -255,7 +255,7 @@ public class MaterializedViewUtils {
             Column mvReferenceColumn = 
context.getMvPartitionColumn().getColumn().get();
             if (partitionColumnSet.contains(mvReferenceColumn)) {
                 context.addTableColumn(table, mvReferenceColumn);
-                context.setPctPossible(true);
+                context.setPctPossible(!mvReferenceColumn.isAllowNull() || 
relatedTable.isPartitionColumnAllowNull());
             }
             return visit(relation, context);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
index f280e867819..5598c812594 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
@@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.util.Utils;
@@ -67,7 +67,7 @@ public class RefreshMTMVInfo {
             Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
             MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), 
TableType.MATERIALIZED_VIEW);
             if (!CollectionUtils.isEmpty(partitions)) {
-                MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
+                MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, partitions);
             }
         } catch (org.apache.doris.common.AnalysisException | 
MetaNotFoundException | DdlException e) {
             throw new AnalysisException(e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 21505586491..2e0de09d29b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -33,7 +33,7 @@ import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.mtmv.MTMVPartitionUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
 import org.apache.doris.qe.ConnectContext;
@@ -634,7 +634,7 @@ public class MetadataGenerator {
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getEnvInfo().toString()));
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getMvProperties().toString()));
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
-                trow.addToColumnValue(new 
TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
+                trow.addToColumnValue(new 
TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv)));
                 dataBatch.add(trow);
             }
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
new file mode 100644
index 00000000000..4bb74bfd448
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVPartitionUtilTest {
+    @Mocked
+    private MTMV mtmv;
+    @Mocked
+    private Partition p1;
+    @Mocked
+    private MTMVRelation relation;
+    @Mocked
+    private BaseTableInfo baseTableInfo;
+    @Mocked
+    private MTMVPartitionInfo mtmvPartitionInfo;
+    @Mocked
+    private OlapTable baseOlapTable;
+    @Mocked
+    private MTMVSnapshotIf baseSnapshotIf;
+    @Mocked
+    private MTMVRefreshSnapshot refreshSnapshot;
+    @Mocked
+    private MTMVUtil mtmvUtil;
+
+    private Set<BaseTableInfo> baseTables = Sets.newHashSet();
+
+    @Before
+    public void setUp() throws NoSuchMethodException, SecurityException, 
AnalysisException {
+        baseTables.add(baseTableInfo);
+        new Expectations() {
+            {
+                mtmv.getRelation();
+                minTimes = 0;
+                result = relation;
+
+                mtmv.getPartitions();
+                minTimes = 0;
+                result = Lists.newArrayList(p1);
+
+                p1.getId();
+                minTimes = 0;
+                result = 1L;
+
+                mtmv.getMvPartitionInfo();
+                minTimes = 0;
+                result = mtmvPartitionInfo;
+
+                mtmvPartitionInfo.getPartitionType();
+                minTimes = 0;
+                result = MTMVPartitionType.SELF_MANAGE;
+
+                mtmvUtil.getTable(baseTableInfo);
+                minTimes = 0;
+                result = baseOlapTable;
+
+                baseOlapTable.needAutoRefresh();
+                minTimes = 0;
+                result = true;
+
+                baseOlapTable.getTableSnapshot();
+                minTimes = 0;
+                result = baseSnapshotIf;
+
+                mtmv.getPartitionName(anyLong);
+                minTimes = 0;
+                result = "p1";
+
+                mtmv.getRefreshSnapshot();
+                minTimes = 0;
+                result = refreshSnapshot;
+
+                refreshSnapshot.equalsWithBaseTable(anyString, anyLong, 
(MTMVSnapshotIf) any);
+                minTimes = 0;
+                result = true;
+
+                relation.getBaseTables();
+                minTimes = 0;
+                result = baseTables;
+
+                baseOlapTable.needAutoRefresh();
+                minTimes = 0;
+                result = true;
+
+                baseOlapTable.getPartitionSnapshot(anyLong);
+                minTimes = 0;
+                result = baseSnapshotIf;
+
+                baseOlapTable.getPartitionName(anyLong);
+                minTimes = 0;
+                result = "p1";
+
+                refreshSnapshot.equalsWithRelatedPartition(anyString, 
anyString, (MTMVSnapshotIf) any);
+                minTimes = 0;
+                result = true;
+            }
+        };
+    }
+
+    @Test
+    public void testIsMTMVSyncNormal() {
+        boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
+        Assert.assertTrue(mtmvSync);
+    }
+
+    @Test
+    public void testIsMTMVSyncNotSync() {
+        new Expectations() {
+            {
+                refreshSnapshot.equalsWithBaseTable(anyString, anyLong, 
(MTMVSnapshotIf) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+        boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
+        Assert.assertFalse(mtmvSync);
+    }
+
+    @Test
+    public void testIsSyncWithPartition() throws AnalysisException {
+        boolean isSyncWithPartition = 
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+        Assert.assertTrue(isSyncWithPartition);
+    }
+
+    @Test
+    public void testIsSyncWithPartitionNotSync() throws AnalysisException {
+        new Expectations() {
+            {
+                refreshSnapshot.equalsWithRelatedPartition(anyString, 
anyString, (MTMVSnapshotIf) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+        boolean isSyncWithPartition = 
MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
+        Assert.assertFalse(isSyncWithPartition);
+    }
+
+    @Test
+    public void testGeneratePartitionName() {
+        List<List<PartitionValue>> inValues = Lists.newArrayList();
+        inValues.add(Lists.newArrayList(new PartitionValue("value11"), new 
PartitionValue("value12")));
+        inValues.add(Lists.newArrayList(new PartitionValue("value21"), new 
PartitionValue("value22")));
+        PartitionKeyDesc inDesc = PartitionKeyDesc.createIn(inValues);
+        String inName = MTMVPartitionUtil.generatePartitionName(inDesc);
+        Assert.assertEquals("p_value11_value12_value21_value22", inName);
+
+        PartitionKeyDesc rangeDesc = PartitionKeyDesc.createFixed(
+                Lists.newArrayList(new PartitionValue(1L)),
+                Lists.newArrayList(new PartitionValue(2L))
+        );
+        String rangeName = MTMVPartitionUtil.generatePartitionName(rangeDesc);
+        Assert.assertEquals("p_1_2", rangeName);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java
new file mode 100644
index 00000000000..42b5b783841
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class MTMVRefreshSnapshotTest {
+    private String mvExistPartitionName = "mvp1";
+    private String relatedExistPartitionName = "p1";
+    private long baseExistTableId = 1L;
+    private long correctVersion = 1L;
+    private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot();
+    private MTMVVersionSnapshot p1Snapshot = new 
MTMVVersionSnapshot(correctVersion);
+    private MTMVVersionSnapshot t1Snapshot = new 
MTMVVersionSnapshot(correctVersion);
+
+    @Before
+    public void setUp() throws NoSuchMethodException, SecurityException, 
AnalysisException {
+        Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = 
Maps.newHashMap();
+        MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new 
MTMVRefreshPartitionSnapshot();
+        partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot);
+        mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, 
p1Snapshot);
+        mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot);
+        refreshSnapshot.updateSnapshots(partitionSnapshots, 
Sets.newHashSet(mvExistPartitionName));
+    }
+
+    @Test
+    public void testPartitionSync() {
+        // normal
+        boolean sync = 
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, 
relatedExistPartitionName,
+                new MTMVVersionSnapshot(correctVersion));
+        Assert.assertTrue(sync);
+        // non exist mv partition
+        sync = refreshSnapshot.equalsWithRelatedPartition("mvp2", 
relatedExistPartitionName,
+                new MTMVVersionSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+        // non exist related partition
+        sync = refreshSnapshot
+                .equalsWithRelatedPartition(mvExistPartitionName, "p2", new 
MTMVVersionSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+        // snapshot value not equal
+        sync = 
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, 
relatedExistPartitionName,
+                new MTMVVersionSnapshot(2L));
+        Assert.assertFalse(sync);
+        // snapshot type not equal
+        sync = 
refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, 
relatedExistPartitionName,
+                new MTMVTimestampSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+    }
+
+    @Test
+    public void testTableSync() {
+        // normal
+        boolean sync = 
refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId,
+                new MTMVVersionSnapshot(correctVersion));
+        Assert.assertTrue(sync);
+        // non exist mv partition
+        sync = refreshSnapshot
+                .equalsWithBaseTable("mvp2", baseExistTableId, new 
MTMVVersionSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+        // non exist related partition
+        sync = refreshSnapshot
+                .equalsWithBaseTable(mvExistPartitionName, 2L, new 
MTMVVersionSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+        // snapshot value not equal
+        sync = refreshSnapshot
+                .equalsWithBaseTable(mvExistPartitionName, baseExistTableId, 
new MTMVVersionSnapshot(2L));
+        Assert.assertFalse(sync);
+        // snapshot type not equal
+        sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, 
baseExistTableId,
+                new MTMVTimestampSnapshot(correctVersion));
+        Assert.assertFalse(sync);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
new file mode 100644
index 00000000000..55394897e42
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class MTMVRewriteUtilTest {
+    @Mocked
+    private MTMV mtmv;
+    @Mocked
+    private ConnectContext ctx;
+    @Mocked
+    private SessionVariable sessionVariable;
+    @Mocked
+    private Partition p1;
+    @Mocked
+    private MTMVRelation relation;
+    @Mocked
+    private MTMVStatus status;
+    @Mocked
+    private MTMVPartitionUtil mtmvPartitionUtil;
+    @Mocked
+    private MTMVUtil mtmvUtil;
+    private long currentTimeMills = 3L;
+
+    @Before
+    public void setUp() throws NoSuchMethodException, SecurityException, 
AnalysisException {
+
+        new Expectations() {
+            {
+                mtmv.getPartitions();
+                minTimes = 0;
+                result = Lists.newArrayList(p1);
+
+                p1.getVisibleVersionTime();
+                minTimes = 0;
+                result = 1L;
+
+                mtmv.getGracePeriod();
+                minTimes = 0;
+                result = 0L;
+
+                mtmv.getRelation();
+                minTimes = 0;
+                result = relation;
+
+                mtmv.getStatus();
+                minTimes = 0;
+                result = status;
+
+                mtmv.getGracePeriod();
+                minTimes = 0;
+                result = 0L;
+
+                status.getState();
+                minTimes = 0;
+                result = MTMVState.NORMAL;
+
+                status.getRefreshState();
+                minTimes = 0;
+                result = MTMVRefreshState.SUCCESS;
+
+                ctx.getSessionVariable();
+                minTimes = 0;
+                result = sessionVariable;
+
+                sessionVariable.isEnableMaterializedViewRewrite();
+                minTimes = 0;
+                result = true;
+
+                
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+                minTimes = 0;
+                result = true;
+
+                MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, 
(Set<BaseTableInfo>) any, (Set<String>) any);
+                minTimes = 0;
+                result = true;
+
+                MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsNormal() {
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsInGracePeriod() throws 
AnalysisException {
+        new Expectations() {
+            {
+                mtmv.getGracePeriod();
+                minTimes = 0;
+                result = 2L;
+
+                MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, 
(Set<BaseTableInfo>) any, (Set<String>) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws 
AnalysisException {
+        new Expectations() {
+            {
+                mtmv.getGracePeriod();
+                minTimes = 0;
+                result = 1L;
+
+                MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, 
(Set<BaseTableInfo>) any, (Set<String>) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void 
testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() {
+        new Expectations() {
+            {
+                sessionVariable.isEnableMaterializedViewRewrite();
+                minTimes = 0;
+                result = false;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsNotSync() throws 
AnalysisException {
+        new Expectations() {
+            {
+                MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, 
(Set<BaseTableInfo>) any, (Set<String>) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsEnableContainExternalTable() {
+        new Expectations() {
+            {
+                MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+                minTimes = 0;
+                result = true;
+
+                
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+                minTimes = 0;
+                result = true;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(1, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsDisableContainExternalTable() {
+        new Expectations() {
+            {
+                MTMVUtil.mtmvContainsExternalTable((MTMV) any);
+                minTimes = 0;
+                result = true;
+
+                
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
+                minTimes = 0;
+                result = false;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsStateAbnormal() {
+        new Expectations() {
+            {
+                status.getState();
+                minTimes = 0;
+                result = MTMVState.SCHEMA_CHANGE;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+    @Test
+    public void testGetMTMVCanRewritePartitionsRefreshStateAbnormal() {
+        new Expectations() {
+            {
+                status.getRefreshState();
+                minTimes = 0;
+                result = MTMVRefreshState.FAIL;
+            }
+        };
+        Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
+                .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
+        Assert.assertEquals(0, mtmvCanRewritePartitions.size());
+    }
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
new file mode 100644
index 00000000000..b1fc52dad46
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVTaskTest {
+    private long poneId = 1L;
+    private String poneName = "p1";
+    private long ptwoId = 2L;
+    private String ptwoName = "p2";
+    private List<Long> allPartitionIds = Lists.newArrayList(poneId, ptwoId);
+    private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(), 
Sets.newHashSet());
+
+    @Mocked
+    private MTMV mtmv;
+    @Mocked
+    private MTMVUtil mtmvUtil;
+    @Mocked
+    private MTMVPartitionUtil mtmvPartitionUtil;
+    @Mocked
+    private MTMVPartitionInfo mtmvPartitionInfo;
+    @Mocked
+    private MTMVRefreshInfo mtmvRefreshInfo;
+
+    @Before
+    public void setUp()
+            throws NoSuchMethodException, SecurityException, 
AnalysisException, DdlException, MetaNotFoundException {
+
+        new Expectations() {
+            {
+                mtmvUtil.getMTMV(anyLong, anyLong);
+                minTimes = 0;
+                result = mtmv;
+
+                mtmv.getPartitionIds();
+                minTimes = 0;
+                result = allPartitionIds;
+
+                mtmv.getMvPartitionInfo();
+                minTimes = 0;
+                result = mtmvPartitionInfo;
+
+                mtmvPartitionInfo.getPartitionType();
+                minTimes = 0;
+                result = MTMVPartitionType.FOLLOW_BASE_TABLE;
+
+                mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, 
Lists.newArrayList(poneName));
+                minTimes = 0;
+                result = poneId;
+
+                mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, 
(Set<String>) any);
+                minTimes = 0;
+                result = true;
+
+                mtmv.getRefreshInfo();
+                minTimes = 0;
+                result = mtmvRefreshInfo;
+
+                mtmvRefreshInfo.getRefreshMethod();
+                minTimes = 0;
+                result = RefreshMethod.COMPLETE;
+            }
+        };
+    }
+
+    @Test
+    public void testCalculateNeedRefreshPartitionsManualComplete() throws 
AnalysisException {
+        MTMVTaskContext context = new 
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true);
+        MTMVTask task = new MTMVTask(mtmv, relation, context);
+        List<Long> result = task.calculateNeedRefreshPartitions();
+        Assert.assertEquals(allPartitionIds, result);
+    }
+
+    @Test
+    public void testCalculateNeedRefreshPartitionsManualPartitions() throws 
AnalysisException {
+        MTMVTaskContext context = new 
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), 
false);
+        MTMVTask task = new MTMVTask(mtmv, relation, context);
+        List<Long> result = task.calculateNeedRefreshPartitions();
+        Assert.assertEquals(Lists.newArrayList(poneId), result);
+    }
+
+    @Test
+    public void testCalculateNeedRefreshPartitionsSystem() throws 
AnalysisException {
+        MTMVTaskContext context = new 
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+        MTMVTask task = new MTMVTask(mtmv, relation, context);
+        List<Long> result = task.calculateNeedRefreshPartitions();
+        Assert.assertTrue(CollectionUtils.isEmpty(result));
+    }
+
+    @Test
+    public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() 
throws AnalysisException {
+        new Expectations() {
+            {
+                mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, 
(Set<String>) any);
+                minTimes = 0;
+                result = false;
+            }
+        };
+        MTMVTaskContext context = new 
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+        MTMVTask task = new MTMVTask(mtmv, relation, context);
+        List<Long> result = task.calculateNeedRefreshPartitions();
+        Assert.assertEquals(allPartitionIds, result);
+    }
+
+    @Test
+    public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws 
AnalysisException {
+        new Expectations() {
+            {
+                mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, 
(Set<String>) any);
+                minTimes = 0;
+                result = false;
+
+                mtmvRefreshInfo.getRefreshMethod();
+                minTimes = 0;
+                result = RefreshMethod.AUTO;
+
+                mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, 
(Set<BaseTableInfo>) any);
+                minTimes = 0;
+                result = Lists.newArrayList(ptwoId);
+            }
+        };
+        MTMVTaskContext context = new 
MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+        MTMVTask task = new MTMVTask(mtmv, relation, context);
+        List<Long> result = task.calculateNeedRefreshPartitions();
+        Assert.assertEquals(Lists.newArrayList(ptwoId), result);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
index 4204bbe0221..02fb18edbf7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java
@@ -26,7 +26,6 @@ import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
@@ -252,7 +251,6 @@ public class MaterializedViewUtilsTest extends 
TestWithFeService {
     }
 
     @Test
-    @Disabled
     public void getRelatedTableInfoTestWithoutGroupNullTest() {
         PlanChecker.from(connectContext)
                 .checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias, 
l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, "
diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out 
b/regression-test/data/mtmv_p0/test_hive_mtmv.out
index 9ee89dd033d..26e34af7b5d 100644
--- a/regression-test/data/mtmv_p0/test_hive_mtmv.out
+++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out
@@ -8,6 +8,11 @@
 1      A       20230101
 2      B       20230101
 3      C       20230101
+
+-- !refresh_complete --
+1      A       20230101
+2      B       20230101
+3      C       20230101
 4      D       20230102
 5      E       20230102
 6      F       20230102
diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
index 573f1f84d5d..cf34cfb616a 100644
--- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
@@ -57,12 +57,20 @@ suite("test_hive_mtmv", 
"p0,external,hive,external_docker,external_docker_hive")
             order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by 
id"
 
             //refresh other partitions
+            // current, for hive, auto refresh will not change data
             sql """
                     REFRESH MATERIALIZED VIEW ${mvName}
                 """
             waitingMTMVTaskFinished(jobName)
             order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by 
id"
 
+            //refresh complete
+            sql """
+                    REFRESH MATERIALIZED VIEW ${mvName} complete
+                """
+            waitingMTMVTaskFinished(jobName)
+            order_qt_refresh_complete "SELECT * FROM ${mvName} order by id"
+
             sql """drop materialized view if exists ${mvName};"""
 
             sql """drop catalog if exists ${catalog_name}"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to