This is an automated email from the ASF dual-hosted git repository.

adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0842aa2947 [Fix](MTMV)Support master and follow change in multi fe for 
mtmv (#16149)
0842aa2947 is described below

commit 0842aa29478462a6d0a304875c42222428c296fc
Author: huangzhaowei <carlmartin...@gmail.com>
AuthorDate: Wed Feb 1 20:02:46 2023 +0800

    [Fix](MTMV)Support master and follow change in multi fe for mtmv (#16149)
    
    Support master and follow change in multi fe for mtmv
    
    This PR fixes following issues:
    
    1. Start the mtmv only in master node, if master change to follower, it 
will stop the scheduler.
    2. Fix a double meta write here
    3. Rename some edit log function and variables
    4. If a mv both have PeriodicalJob and immediate job and PeriodicalJob will 
be trigger right now, scheduler will ignore the immediate job.
    5. Fix expired time bugs, and make sure it will be clean among all the fes.
    6. cleanerScheduler interval from 1 day to 1 minute.
---
 .../main/java/org/apache/doris/catalog/Env.java    |  8 ++-
 .../apache/doris/datasource/InternalCatalog.java   |  2 +-
 .../org/apache/doris/journal/JournalEntity.java    |  4 +-
 .../java/org/apache/doris/mtmv/MTMVJobFactory.java | 12 ++--
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 65 +++++++++++++++-------
 .../org/apache/doris/mtmv/MTMVTaskExecutor.java    |  4 ++
 .../apache/doris/mtmv/MTMVTaskExecutorPool.java    |  2 +-
 .../org/apache/doris/mtmv/MTMVTaskManager.java     | 41 +++++++++++---
 .../main/java/org/apache/doris/mtmv/MTMVUtils.java |  5 +-
 .../org/apache/doris/mtmv/metadata/MTMVJob.java    |  4 +-
 .../java/org/apache/doris/persist/EditLog.java     | 20 +++----
 .../org/apache/doris/persist/OperationType.java    |  4 +-
 .../suites/mtmv_p0/test_create_mtmv.groovy         | 40 ++++++++++---
 13 files changed, 147 insertions(+), 64 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index c37e810d5d..7446ca6bae 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -854,8 +854,6 @@ public class Env {
             // If not using bdb, we need to notify the FE type transfer 
manually.
             notifyNewFETypeTransfer(FrontendNodeType.MASTER);
         }
-        // 7. start mtmv jobManager
-        mtmvJobManager.start();
     }
 
     // wait until FE is ready.
@@ -1412,7 +1410,8 @@ public class Env {
         if (Config.enable_hms_events_incremental_sync) {
             metastoreEventsProcessor.start();
         }
-
+        // start mtmv jobManager
+        mtmvJobManager.start();
     }
 
     // start threads that should running on all FE
@@ -1462,6 +1461,9 @@ public class Env {
         startNonMasterDaemonThreads();
 
         MetricRepo.init();
+
+        // stop mtmv scheduler
+        mtmvJobManager.stop();
     }
 
     // Set global variable 'lower_case_table_names' only when the cluster is 
initialized.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 4aa4ce71c3..c8214ec7d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -956,7 +956,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
         if (table instanceof MaterializedView && 
Config.enable_mtmv_scheduler_framework) {
             List<Long> dropIds = 
Env.getCurrentEnv().getMTMVJobManager().showJobs(db.getFullName(), 
table.getName())
                     .stream().map(MTMVJob::getId).collect(Collectors.toList());
-            Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, false);
+            Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, 
isReplay);
             LOG.info("Drop related {} mv job.", dropIds.size());
         }
         LOG.info("finished dropping table[{}] in db[{}]", table.getName(), 
db.getFullName());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 070eec0725..0c0e49f617 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -744,7 +744,7 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_ALTER_MTMV_JOB: {
+            case OperationType.OP_CHANGE_MTMV_JOB: {
                 data = ChangeMTMVJob.read(in);
                 isRead = true;
                 break;
@@ -759,7 +759,7 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_ALTER_MTMV_TASK: {
+            case OperationType.OP_CHANGE_MTMV_TASK: {
                 data = ChangeMTMVTask.read(in);
                 isRead = true;
                 break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
index 5af572304e..aef9b92386 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
@@ -57,14 +57,18 @@ public class MTMVJobFactory {
 
     public static List<MTMVJob> buildJob(MaterializedView materializedView, 
String dbName) {
         List<MTMVJob> jobs = new ArrayList<>();
-        if (materializedView.getBuildMode() == BuildMode.IMMEDIATE) {
-            jobs.add(genOnceJob(materializedView, dbName));
-        }
         MVRefreshTriggerInfo triggerInfo = 
materializedView.getRefreshInfo().getTriggerInfo();
+        boolean isRunPeriodJobImmediate = false;
         if (triggerInfo != null && triggerInfo.getRefreshTrigger() == 
RefreshTrigger.INTERVAL) {
-            jobs.add(genPeriodicalJob(materializedView, dbName));
+            MTMVJob job = genPeriodicalJob(materializedView, dbName);
+            isRunPeriodJobImmediate = MTMVUtils.getDelaySeconds(job) == 0;
+            jobs.add(job);
         }
 
+        // if the PeriodicalJob run immediate since an early start time, don't 
run the immediate build.
+        if (!isRunPeriodJobImmediate && materializedView.getBuildMode() == 
BuildMode.IMMEDIATE) {
+            jobs.add(genOnceJob(materializedView, dbName));
+        }
         return jobs;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index fe77ea4015..fd9f55ce10 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -62,9 +62,9 @@ public class MTMVJobManager {
 
     private final MTMVTaskManager taskManager;
 
-    private final ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService periodScheduler = 
Executors.newScheduledThreadPool(1);
 
-    private final ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService cleanerScheduler = 
Executors.newScheduledThreadPool(1);
 
     private final ReentrantLock reentrantLock;
 
@@ -82,8 +82,17 @@ public class MTMVJobManager {
         if (isStarted.compareAndSet(false, true)) {
             taskManager.clearUnfinishedTasks();
 
+            // check the scheduler before using it
+            // since it may be shutdown when master change to follower without 
process shutdown.
+            if (periodScheduler.isShutdown()) {
+                periodScheduler = Executors.newScheduledThreadPool(1);
+            }
+
             registerJobs();
 
+            if (cleanerScheduler.isShutdown()) {
+                cleanerScheduler = Executors.newScheduledThreadPool(1);
+            }
             cleanerScheduler.scheduleAtFixedRate(() -> {
                 if (!Env.getCurrentEnv().isMaster()) {
                     return;
@@ -99,29 +108,43 @@ public class MTMVJobManager {
                 } finally {
                     unlock();
                 }
-            }, 0, 1, TimeUnit.DAYS);
+            }, 0, 1, TimeUnit.MINUTES);
 
             taskManager.startTaskScheduler();
         }
     }
 
+    public void stop() {
+        if (isStarted.compareAndSet(true, false)) {
+            periodScheduler.shutdown();
+            cleanerScheduler.shutdown();
+            taskManager.stopTaskScheduler();
+        }
+    }
+
     private void registerJobs() {
+        int num = nameToJobMap.size();
+        int periodNum = 0;
+        int onceNum = 0;
         for (MTMVJob job : nameToJobMap.values()) {
-            if (job.getState() != JobState.ACTIVE) {
+            if (!job.getState().equals(JobState.ACTIVE)) {
                 continue;
             }
             if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
                 JobSchedule schedule = job.getSchedule();
                 ScheduledFuture<?> future = 
periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
-                        MTMVUtils.getDelaySeconds(job), schedule.getPeriod(), 
schedule.getTimeUnit());
+                        MTMVUtils.getDelaySeconds(job), 
schedule.getSecondPeriod(), TimeUnit.SECONDS);
                 periodFutureMap.put(job.getId(), future);
+                periodNum++;
             } else if (job.getTriggerMode() == TriggerMode.ONCE) {
                 if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || 
job.getRetryPolicy() == TaskRetryPolicy.TIMES) {
                     MTMVTaskExecuteParams executeOption = new 
MTMVTaskExecuteParams();
                     submitJobTask(job.getName(), executeOption);
+                    onceNum++;
                 }
             }
         }
+        LOG.info("Register {} period jobs and {} once jobs in the total {} 
jobs.", periodNum, onceNum, num);
     }
 
     public void createJob(MTMVJob job, boolean isReplay) throws DdlException {
@@ -146,27 +169,33 @@ public class MTMVJobManager {
                 idToJobMap.put(job.getId(), job);
                 if (!isReplay) {
                     // log job before submit any task.
-                    Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
+                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
                     ScheduledFuture<?> future = 
periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
-                            MTMVUtils.getDelaySeconds(job), 
schedule.getPeriod(), schedule.getTimeUnit());
+                            MTMVUtils.getDelaySeconds(job), 
schedule.getSecondPeriod(), TimeUnit.SECONDS);
                     periodFutureMap.put(job.getId(), future);
                 }
             } else if (job.getTriggerMode() == TriggerMode.ONCE) {
-                job.setState(JobState.ACTIVE);
-                job.setExpireTime(MTMVUtils.getNowTimeStamp() + 
Config.scheduler_mtmv_job_expired);
+                // only change once job state from unknown to active. if job 
is completed, only put it in map
+                if (job.getState() == JobState.UNKNOWN) {
+                    job.setState(JobState.ACTIVE);
+                    job.setExpireTime(MTMVUtils.getNowTimeStamp() + 
Config.scheduler_mtmv_job_expired);
+                }
                 nameToJobMap.put(job.getName(), job);
                 idToJobMap.put(job.getId(), job);
                 if (!isReplay) {
-                    Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
+                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
                     MTMVTaskExecuteParams executeOption = new 
MTMVTaskExecuteParams();
                     submitJobTask(job.getName(), executeOption);
                 }
             } else if (job.getTriggerMode() == TriggerMode.MANUAL) {
-                job.setState(JobState.ACTIVE);
+                // only change once job state from unknown to active. if job 
is completed, only put it in map
+                if (job.getState() == JobState.UNKNOWN) {
+                    job.setState(JobState.ACTIVE);
+                }
                 nameToJobMap.put(job.getName(), job);
                 idToJobMap.put(job.getId(), job);
                 if (!isReplay) {
-                    Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
+                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
                 }
             } else {
                 throw new DdlException("Unsupported trigger mode for 
multi-table mv.");
@@ -235,7 +264,7 @@ public class MTMVJobManager {
             job.setState(changeJob.getToStatus());
             job.setLastModifyTime(changeJob.getLastModifyTime());
             if (!isReplay) {
-                
Env.getCurrentEnv().getEditLog().logChangeScheduleJob(changeJob);
+                Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
             }
         } finally {
             unlock();
@@ -268,7 +297,7 @@ public class MTMVJobManager {
             }
 
             if (!isReplay) {
-                Env.getCurrentEnv().getEditLog().logDropScheduleJob(jobIds);
+                Env.getCurrentEnv().getEditLog().logDropMTMVJob(jobIds);
             }
         } finally {
             unlock();
@@ -343,11 +372,7 @@ public class MTMVJobManager {
     }
 
     public void replayDropJobTasks(List<String> taskIds) {
-        Map<String, String> index = 
Maps.newHashMapWithExpectedSize(taskIds.size());
-        for (String taskId : taskIds) {
-            index.put(taskId, null);
-        }
-        taskManager.getAllHistory().removeIf(runStatus -> 
index.containsKey(runStatus.getTaskId()));
+        taskManager.dropTasks(taskIds, true);
     }
 
     public void removeExpiredJobs() {
@@ -382,7 +407,7 @@ public class MTMVJobManager {
             unlock();
         }
 
-        dropJobs(jobIdsToDelete, true);
+        dropJobs(jobIdsToDelete, false);
     }
 
     public MTMVJob getJob(String jobName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
index 7fddfd91a4..da1685f3ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
@@ -125,6 +125,10 @@ public class MTMVTaskExecutor implements 
Comparable<MTMVTaskExecutor> {
         return task;
     }
 
+    public void setTask(MTMVTask task) {
+        this.task = task;
+    }
+
     public MTMVTask initTask(String taskId, Long createTime) {
         MTMVTask task = new MTMVTask();
         task.setTaskId(taskId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
index 419412b3c3..f68f68cbaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
@@ -81,7 +81,7 @@ public class MTMVTaskExecutorPool {
 
             ChangeMTMVTask changeTask = new 
ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
                     task.getState());
-            Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
+            Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
         });
         taskExecutor.setFuture(future);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 6cb90c262d..55508b18c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -37,10 +37,12 @@ import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Deque;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -67,7 +69,7 @@ public class MTMVTaskManager {
     // keep track of all the completed tasks
     private final Deque<MTMVTask> historyQueue = 
Queues.newLinkedBlockingDeque();
 
-    private final ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService taskScheduler = 
Executors.newScheduledThreadPool(1);
 
     private final MTMVJobManager mtmvJobManager;
 
@@ -76,6 +78,9 @@ public class MTMVTaskManager {
     }
 
     public void startTaskScheduler() {
+        if (taskScheduler.isShutdown()) {
+            taskScheduler = Executors.newScheduledThreadPool(1);
+        }
         taskScheduler.scheduleAtFixedRate(() -> {
             if (!tryLock()) {
                 return;
@@ -91,6 +96,10 @@ public class MTMVTaskManager {
         }, 0, 1, TimeUnit.SECONDS);
     }
 
+    public void stopTaskScheduler() {
+        taskScheduler.shutdown();
+    }
+
     public MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor 
taskExecutor, MTMVTaskExecuteParams params) {
         // duplicate submit
         if (taskExecutor.getTask() != null) {
@@ -113,7 +122,8 @@ public class MTMVTaskManager {
         String taskId = UUID.randomUUID().toString();
         MTMVTask task = taskExecutor.initTask(taskId, 
MTMVUtils.getNowTimeStamp());
         task.setPriority(params.getPriority());
-        Env.getCurrentEnv().getEditLog().logCreateScheduleTask(task);
+        LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, 
taskExecutor.getJob().getName());
+        Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task);
         arrangeToPendingTask(taskExecutor);
         return MTMVUtils.TaskSubmitStatus.SUBMITTED;
     }
@@ -228,7 +238,7 @@ public class MTMVTaskManager {
 
     private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState 
fromStatus, TaskState toStatus) {
         ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, 
fromStatus, toStatus);
-        Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
+        Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
     }
 
     public boolean tryLock() {
@@ -322,7 +332,7 @@ public class MTMVTaskManager {
                     return;
                 }
                 MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job);
-                taskExecutor.initTask(task.getTaskId(), task.getCreateTime());
+                taskExecutor.setTask(task);
                 arrangeToPendingTask(taskExecutor);
                 break;
             case RUNNING:
@@ -399,19 +409,32 @@ public class MTMVTaskManager {
         }
         try {
             Deque<MTMVTask> taskHistory = getAllHistory();
-            Iterator<MTMVTask> iterator = taskHistory.iterator();
-            while (iterator.hasNext()) {
-                MTMVTask task = iterator.next();
+            for (MTMVTask task : taskHistory) {
                 long expireTime = task.getExpireTime();
                 if (currentTime > expireTime) {
                     historyToDelete.add(task.getTaskId());
-                    iterator.remove();
                 }
             }
         } finally {
             unlock();
         }
-        LOG.info("remove task history:{}", historyToDelete);
+        dropTasks(historyToDelete, false);
+    }
+
+    public void dropTasks(List<String> taskIds, boolean isReplay) {
+        if (!tryLock()) {
+            return;
+        }
+        try {
+            Set<String> taskSet = new HashSet<>(taskIds);
+            getAllHistory().removeIf(mtmvTask -> 
taskSet.contains(mtmvTask.getTaskId()));
+            if (!isReplay) {
+                Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
+            }
+        } finally {
+            unlock();
+        }
+        LOG.info("drop task history:{}", taskIds);
     }
 
     public void clearUnfinishedTasks() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
index 874eb0510c..d421d2f50a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
@@ -85,7 +85,6 @@ public class MTMVUtils {
         return getDelaySeconds(job, LocalDateTime.now());
     }
 
-    // this method only for test
     public static long getDelaySeconds(MTMVJob job, LocalDateTime now) {
         long lastModifyTime = job.getLastModifyTime();
         long nextTime = 0;
@@ -117,10 +116,10 @@ public class MTMVUtils {
         switch (strTimeUnit.toUpperCase()) {
             case "SECOND":
                 return TimeUnit.SECONDS;
+            case "MINUTE":
+                return TimeUnit.MINUTES;
             case "HOUR":
                 return TimeUnit.HOURS;
-            case "DAY":
-                return TimeUnit.DAYS;
             default:
                 return TimeUnit.DAYS;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
index a8dd62b4a9..8a348b7f99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
@@ -259,8 +259,8 @@ public class MTMVJob implements Writable, Comparable {
         }
 
         public String toString() {
-            return " (START " + 
LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), 
ZoneId.systemDefault())
-                    + " EVERY(" + period + " " + timeUnit + "))";
+            return "START " + 
LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), 
ZoneId.systemDefault())
+                    + " EVERY(" + period + " " + timeUnit + ")";
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 03ef40086d..f6a71b5377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -881,7 +881,7 @@ public class EditLog {
                     env.getMTMVJobManager().replayCreateJob(job);
                     break;
                 }
-                case OperationType.OP_ALTER_MTMV_JOB: {
+                case OperationType.OP_CHANGE_MTMV_JOB: {
                     final ChangeMTMVJob changeJob = (ChangeMTMVJob) 
journal.getData();
                     env.getMTMVJobManager().replayUpdateJob(changeJob);
                     break;
@@ -896,7 +896,7 @@ public class EditLog {
                     env.getMTMVJobManager().replayCreateJobTask(task);
                     break;
                 }
-                case OperationType.OP_ALTER_MTMV_TASK: {
+                case OperationType.OP_CHANGE_MTMV_TASK: {
                     final ChangeMTMVTask changeTask = (ChangeMTMVTask) 
journal.getData();
                     env.getMTMVJobManager().replayUpdateTask(changeTask);
                     break;
@@ -1613,27 +1613,27 @@ public class EditLog {
         logEdit(id, log);
     }
 
-    public void logCreateScheduleJob(MTMVJob job) {
+    public void logCreateMTMVJob(MTMVJob job) {
         logEdit(OperationType.OP_CREATE_MTMV_JOB, job);
     }
 
-    public void logDropScheduleJob(List<Long> jobIds) {
+    public void logDropMTMVJob(List<Long> jobIds) {
         logEdit(OperationType.OP_DROP_MTMV_JOB, new DropMTMVJob(jobIds));
     }
 
-    public void logChangeScheduleJob(ChangeMTMVJob changeJob) {
-        logEdit(OperationType.OP_ALTER_MTMV_JOB, changeJob);
+    public void logChangeMTMVJob(ChangeMTMVJob changeJob) {
+        logEdit(OperationType.OP_CHANGE_MTMV_JOB, changeJob);
     }
 
-    public void logCreateScheduleTask(MTMVTask task) {
+    public void logCreateMTMVTask(MTMVTask task) {
         logEdit(OperationType.OP_CREATE_MTMV_TASK, task);
     }
 
-    public void logAlterScheduleTask(ChangeMTMVTask changeTaskRecord) {
-        logEdit(OperationType.OP_ALTER_MTMV_TASK, changeTaskRecord);
+    public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) {
+        logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord);
     }
 
-    public void logAlterScheduleTask(List<String> taskIds) {
+    public void logDropMTMVTasks(List<String> taskIds) {
         logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 813b4c4c49..73ead272e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -251,11 +251,11 @@ public class OperationType {
     // scheduler job and task 330-350
     public static final short OP_CREATE_MTMV_JOB = 330;
     public static final short OP_DROP_MTMV_JOB = 331;
-    public static final short OP_ALTER_MTMV_JOB = 332;
+    public static final short OP_CHANGE_MTMV_JOB = 332;
 
     public static final short OP_CREATE_MTMV_TASK = 340;
     public static final short OP_DROP_MTMV_TASK = 341;
-    public static final short OP_ALTER_MTMV_TASK = 342;
+    public static final short OP_CHANGE_MTMV_TASK = 342;
 
     public static final short OP_DROP_EXTERNAL_TABLE = 350;
     public static final short OP_DROP_EXTERNAL_DB = 351;
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
index 6d3966614e..61534f7758 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
@@ -16,16 +16,12 @@
 // under the License.
 
 suite("test_create_mtmv") {
-    def dbName = "db_mtmv"
     def tableName = "t_user"
     def tableNamePv = "t_user_pv"
     def mvName = "multi_mv"
     sql """
         ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true");
         """
-    sql "DROP DATABASE IF EXISTS ${dbName};"
-    sql "CREATE DATABASE ${dbName};"
-    sql "USE ${dbName};"
 
     sql """
         CREATE TABLE IF NOT EXISTS `${tableName}` (
@@ -42,7 +38,7 @@ suite("test_create_mtmv") {
         INSERT INTO ${tableName} 
VALUES("2022-10-26",1,"clz"),("2022-10-28",2,"zhangsang"),("2022-10-29",3,"lisi");
     """
     sql """
-        create table ${tableNamePv}(
+        create table IF NOT EXISTS ${tableNamePv}(
             event_day DATE,
             id BIGINT,
             pv BIGINT
@@ -66,9 +62,9 @@ suite("test_create_mtmv") {
         SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, 
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
     """
 
-    def show_task_meta = sql_meta "SHOW MTMV TASK FROM ${dbName}"
+    def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}"
     def index = show_task_meta.indexOf(['State', 'CHAR'])
-    def query = "SHOW MTMV TASK FROM ${dbName}"
+    def query = "SHOW MTMV TASK ON ${mvName}"
     def show_task_result
     def state
     do {
@@ -80,5 +76,35 @@ suite("test_create_mtmv") {
 
     assertEquals 'SUCCESS', state, show_task_result.last().toString()
     order_qt_select "SELECT * FROM ${mvName}"
+
+    sql """
+        DROP MATERIALIZED VIEW ${mvName}
+    """
+
+    // test only one job created when build IMMEDIATE and start time is before 
now.
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+        BUILD IMMEDIATE REFRESH COMPLETE
+        start with "2022-11-03 00:00:00" next 1 DAY
+        KEY(username)   
+        DISTRIBUTED BY HASH (username)  buckets 1
+        PROPERTIES ('replication_num' = '1') 
+        AS 
+        SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, 
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
+    """
+    // wait task to be finished to avoid task leak in suite.
+    do {
+        show_task_result = sql "${query}"
+        state = show_task_result.last().get(index)
+        println "The state of ${query} is ${state}"
+        Thread.sleep(1000);
+    } while (state.equals('PENDING') || state.equals('RUNNING'))
+
+    def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
+    assertEquals 1, show_job_result.size()
+
+    sql """
+        DROP MATERIALIZED VIEW ${mvName}
+    """
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to