morningman commented on code in PR #26146:
URL: https://github.com/apache/doris/pull/26146#discussion_r1399929228


##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1590,6 +1590,16 @@ public class Config extends ConfigBase {
     @ConfField(description = {"用于执行 Insert 任务的线程数", "The number of threads 
used to consume insert tasks."})
     public static int job_insert_task_consumer_thread_num = 10;
 
+
+    /**
+     * The number of threads used to consume mtmv tasks.
+     * if you have a lot of mtmv jobs,and the average execution frequency is 
relatively high you need to increase
+     * this value
+     * The value should be greater than 0, if it is 0 or <=0, set it to 5
+     */
+    @ConfField(description = {"用于执行 Insert 任务的线程数", "The number of threads 
used to consume insert tasks."})

Review Comment:
   need to modify the description



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.mtmv.MTMVJob;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
+import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.persist.AlterMTMV;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+public class MTMVJobManager implements MTMVHookService {
+    public static final String MTMV_JOB_PREFIX = "mtmv_";
+
+    @Override
+    public void createMTMV(MTMV mtmv) throws DdlException {
+        MTMVJob job = new MTMVJob(mtmv.getQualifiedDbName(), mtmv.getId());
+        job.setJobId(Env.getCurrentEnv().getNextId());
+        job.setJobName(mtmv.getJobInfo().getJobName());
+        job.setComment(mtmv.getName());
+        job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
+        job.setJobStatus(JobStatus.RUNNING);
+        JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
+        if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.SCHEDULE)) {
+            setScheduleJobConfig(jobExecutionConfiguration, mtmv);

Review Comment:
   I think `setScheduleJobConfig` and `setManualJobConfig` can be merged into 
one method?



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.mtmv.MTMVJob;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
+import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.persist.AlterMTMV;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+public class MTMVJobManager implements MTMVHookService {
+    public static final String MTMV_JOB_PREFIX = "mtmv_";
+
+    @Override
+    public void createMTMV(MTMV mtmv) throws DdlException {
+        MTMVJob job = new MTMVJob(mtmv.getQualifiedDbName(), mtmv.getId());
+        job.setJobId(Env.getCurrentEnv().getNextId());
+        job.setJobName(mtmv.getJobInfo().getJobName());
+        job.setComment(mtmv.getName());
+        job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
+        job.setJobStatus(JobStatus.RUNNING);
+        JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
+        if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.SCHEDULE)) {
+            setScheduleJobConfig(jobExecutionConfiguration, mtmv);
+        } else if 
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.MANUAL)) {
+            setManualJobConfig(jobExecutionConfiguration, mtmv);
+        }
+        job.setJobConfig(jobExecutionConfiguration);
+        //job.checkJobParams(); fixme twice check
+        try {
+            Env.getCurrentEnv().getJobManager().registerJob(job);
+        } catch (JobException e) {
+            e.printStackTrace();

Review Comment:
   remove



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.mtmv.MTMVJob;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
+import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.persist.AlterMTMV;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+public class MTMVJobManager implements MTMVHookService {
+    public static final String MTMV_JOB_PREFIX = "mtmv_";
+
+    @Override
+    public void createMTMV(MTMV mtmv) throws DdlException {
+        MTMVJob job = new MTMVJob(mtmv.getQualifiedDbName(), mtmv.getId());
+        job.setJobId(Env.getCurrentEnv().getNextId());
+        job.setJobName(mtmv.getJobInfo().getJobName());
+        job.setComment(mtmv.getName());
+        job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
+        job.setJobStatus(JobStatus.RUNNING);
+        JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
+        if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.SCHEDULE)) {
+            setScheduleJobConfig(jobExecutionConfiguration, mtmv);
+        } else if 
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.MANUAL)) {
+            setManualJobConfig(jobExecutionConfiguration, mtmv);
+        }
+        job.setJobConfig(jobExecutionConfiguration);
+        //job.checkJobParams(); fixme twice check
+        try {
+            Env.getCurrentEnv().getJobManager().registerJob(job);
+        } catch (JobException e) {
+            e.printStackTrace();
+            throw new DdlException(e.getMessage());

Review Comment:
   ```suggestion
               throw new DdlException(e.getMessage(), e);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java:
##########
@@ -875,4 +872,36 @@ public AlterHandler getMaterializedViewHandler() {
     public AlterHandler getClusterHandler() {
         return clusterHandler;
     }
+
+    public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
+        TableNameInfo tbl = alterMTMV.getMvName();
+        MTMV mtmv = null;
+        try {
+            Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb());
+            mtmv = (MTMV) db.getTableOrMetaException(tbl.getTbl(), 
TableType.MATERIALIZED_VIEW);
+
+            mtmv.writeLock();
+            if (alterMTMV.getRefreshInfo() != null) {

Review Comment:
   Better use a enum `type` to specify what kind of this alter is?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -132,23 +153,16 @@ public void updateJobStatus(JobStatus newJobStatus) {
     protected abstract void checkJobParamsInternal();
 
     public static AbstractJob readFields(DataInput in) throws IOException {
-        // todo use RuntimeTypeAdapterFactory of Gson to do the serde
-        JobType jobType = JobType.valueOf(Text.readString(in));
-        switch (jobType) {
-            case INSERT:
-                return InsertJob.readFields(in);
-            case MTMV:
-                // return MTMVJob.readFields(in);
-                break;
-            default:
-                throw new IllegalArgumentException("unknown job type");
-        }
-        throw new IllegalArgumentException("unknown job type");
+        String jsonJob = Text.readString(in);
+        AbstractJob<?> job = GsonUtils.GSON.fromJson(jsonJob, 
AbstractJob.class);
+        job.setRunningTasks(new ArrayList<>());

Review Comment:
   Why  need to set running tasks to empty?
   The running tasks is not persisted.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java:
##########
@@ -17,16 +17,185 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVCache;
+import org.apache.doris.mtmv.MTMVJobInfo;
+import org.apache.doris.mtmv.MTMVJobManager;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVStatus;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 
 public class MTMV extends OlapTable {
+    private static final Logger LOG = LogManager.getLogger(MTMV.class);
+    private ReentrantReadWriteLock mvRwLock;
+
+    @SerializedName("ri")
+    private MTMVRefreshInfo refreshInfo;
+    @SerializedName("qs")
+    private String querySql;
+    @SerializedName("s")
+    private MTMVStatus status;
+    @SerializedName("ei")
+    private EnvInfo envInfo;

Review Comment:
   `EnvInfo` only contains ctl and database, the class name is not suitable?



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.persist.AlterMTMV;
+
+public interface MTMVHookService {

Review Comment:
   Add comment for this interface and all its methods



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.mtmv;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.mtmv.MTMVCache;
+import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVStatus;
+import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.UUID;
+
+public class MTMVTask extends AbstractTask {
+    private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
+    public static final Long MAX_HISTORY_TASKS_NUM = 100L;
+
+    @SerializedName(value = "dn")
+    private String dbName;

Review Comment:
   Why saving dbName, not dbId?



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -53,37 +54,35 @@ public void start() {
 
     public void registerJob(T job) throws JobException {
         job.checkJobParams();
-        checkJobNameExist(job.getJobName(), job.getJobType(), 
job.getCurrentDbName());
+        checkJobNameExist(job.getJobName(), job.getJobType());
         if (jobMap.get(job.getJobId()) != null) {
             throw new JobException("job id exist,jobId:" + job.getJobId());
         }
-        //Env.getCurrentEnv().getEditLog().logCreateJob(job);
+        Env.getCurrentEnv().getEditLog().logCreateJob(job);
         //check name exist

Review Comment:
   Do we check the job name?
   is it a TODO?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.mtmv;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.mtmv.MTMVCache;
+import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVStatus;
+import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.UUID;
+
+public class MTMVTask extends AbstractTask {
+    private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
+    public static final Long MAX_HISTORY_TASKS_NUM = 100L;
+
+    @SerializedName(value = "dn")
+    private String dbName;
+    @SerializedName(value = "mi")
+    private long mtmvId;
+    @SerializedName("sql")
+    private String sql;
+
+    private MTMV mtmv;
+    private MTMVCache cache;
+
+    public MTMVTask(String dbName, long mtmvId) {
+        this.dbName = dbName;
+        this.mtmvId = mtmvId;
+    }
+
+    @Override
+    public void run() throws JobException {
+        ConnectContext ctx = createContext();

Review Comment:
   you create a ctx here, and create another ctx in `generateMTMVCache() -> 
getPlanBySql()` again?
   And in ctx, can we use the user identity who created this job?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5741,4 +5759,28 @@ public ColumnIdFlushDaemon getColumnIdFlusher() {
     public StatisticsAutoCollector getStatisticsAutoCollector() {
         return statisticsAutoCollector;
     }
+
+    public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) throws 
UserException {

Review Comment:
   The following 4 methods can be moved to `Alter` class?



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java:
##########
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Set;
+
+public class MTMVCache {
+    @SerializedName("bt")
+    private Set<BaseTableInfo> baseTables;
+    @SerializedName("bv")
+    private Set<BaseTableInfo> baseViews;

Review Comment:
   What is diff between `baseTables` and `baseViews`?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.mtmv;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.common.TaskType;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MTMVJob extends AbstractJob<MTMVTask> {
+    private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
+    private static final ShowResultSetMetaData JOB_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("JobId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("JobName", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ExecuteType", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("RecurringStrategy", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("JobStatus", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("CreateTime", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Comment", 
ScalarType.createVarchar(20)))
+                    .build();
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("JobId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("StartTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Duration", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ExecuteSql", 
ScalarType.createVarchar(20)))
+                    .build();
+
+    @SerializedName(value = "dn")
+    private String dbName;

Review Comment:
   Why not using dbId?



##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.mtmv.MTMVJob;
+import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
+import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
+import org.apache.doris.persist.AlterMTMV;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+public class MTMVJobManager implements MTMVHookService {
+    public static final String MTMV_JOB_PREFIX = "mtmv_";
+
+    @Override
+    public void createMTMV(MTMV mtmv) throws DdlException {
+        MTMVJob job = new MTMVJob(mtmv.getQualifiedDbName(), mtmv.getId());
+        job.setJobId(Env.getCurrentEnv().getNextId());
+        job.setJobName(mtmv.getJobInfo().getJobName());
+        job.setComment(mtmv.getName());
+        job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
+        job.setJobStatus(JobStatus.RUNNING);
+        JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
+        if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.SCHEDULE)) {
+            setScheduleJobConfig(jobExecutionConfiguration, mtmv);
+        } else if 
(mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
+                .equals(RefreshTrigger.MANUAL)) {
+            setManualJobConfig(jobExecutionConfiguration, mtmv);
+        }
+        job.setJobConfig(jobExecutionConfiguration);
+        //job.checkJobParams(); fixme twice check
+        try {
+            Env.getCurrentEnv().getJobManager().registerJob(job);
+        } catch (JobException e) {
+            e.printStackTrace();
+            throw new DdlException(e.getMessage());
+        }
+    }
+
+    private void setManualJobConfig(JobExecutionConfiguration 
jobExecutionConfiguration, MTMV mtmv) {
+        jobExecutionConfiguration.setExecuteType(JobExecuteType.MANUAL);
+        if (mtmv.getRefreshInfo().getBuildMode().equals(BuildMode.IMMEDIATE)) {
+            jobExecutionConfiguration.setImmediate(true);
+        } else {
+            jobExecutionConfiguration.setImmediate(false);
+        }
+    }
+
+    private void setScheduleJobConfig(JobExecutionConfiguration 
jobExecutionConfiguration, MTMV mtmv) {
+        jobExecutionConfiguration.setExecuteType(JobExecuteType.RECURRING);
+        TimerDefinition timerDefinition = new TimerDefinition();
+        timerDefinition
+                
.setInterval(mtmv.getRefreshInfo().getRefreshTriggerInfo().getIntervalTrigger().getInterval());
+        timerDefinition
+                
.setIntervalUnit(mtmv.getRefreshInfo().getRefreshTriggerInfo().getIntervalTrigger().getTimeUnit());
+        if (mtmv.getRefreshInfo().getBuildMode().equals(BuildMode.IMMEDIATE)) {
+            jobExecutionConfiguration.setImmediate(true);
+        } else if 
(mtmv.getRefreshInfo().getBuildMode().equals(BuildMode.DEFERRED) && !StringUtils
+                
.isEmpty(mtmv.getRefreshInfo().getRefreshTriggerInfo().getIntervalTrigger().getStartTime()))
 {
+            timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(
+                    
mtmv.getRefreshInfo().getRefreshTriggerInfo().getIntervalTrigger().getStartTime()));
+        }
+
+        jobExecutionConfiguration.setTimerDefinition(timerDefinition);
+    }
+
+    @Override
+    public void dropMTMV(MTMV mtmv) throws DdlException {
+        List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
+                .queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName());
+        if (!CollectionUtils.isEmpty(jobs)) {
+            try {
+                Env.getCurrentEnv().getJobManager()
+                        .unregisterJob(jobs.get(0).getJobId());
+            } catch (JobException e) {
+                e.printStackTrace();
+                throw new DdlException(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void registerMTMV(MTMV mtmv, Long dbId) {
+
+    }
+
+    @Override
+    public void deregisterMTMV(MTMV mtmv) {
+
+    }
+
+    @Override
+    public void alterMTMV(MTMV mtmv, AlterMTMV alterMTMV) throws DdlException {
+        if (alterMTMV.isNeedRebuildJob()) {
+            dropMTMV(mtmv);
+            createMTMV(mtmv);
+        }
+    }
+
+    @Override
+    public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, 
MetaNotFoundException {
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb());
+        MTMV mtmv = (MTMV) 
db.getTableOrMetaException(info.getMvName().getTbl(), 
TableType.MATERIALIZED_VIEW);
+        List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
+                .queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName());
+        if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
+            throw new DdlException("jobs not normal");

Review Comment:
   add more details in exception



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -34,44 +53,137 @@ public class InsertTask extends AbstractTask {
     private String labelName;
 
     private InsertIntoTableCommand command;
-    private LoadJob.LoadStatistic statistic;
-    private FailMsg failMsg;
 
-    private InsertIntoState insertIntoState;
+    private StmtExecutor stmtExecutor;
+
+    private ConnectContext ctx;
+
+    private String sql;
+
+    private String currentDb;
+
+    private AtomicBoolean isCanceled = new AtomicBoolean(false);
+
+    private AtomicBoolean isFinished = new AtomicBoolean(false);
+
+
+    @Getter
+    @Setter
+    private LoadJob loadJob;
+
 
     @Override
-    public void before() {
+    public void before() throws JobException {
+        if (isCanceled.get()) {
+            throw new JobException("Export executor has been canceled, task 
id: {}", getTaskId());
+        }
+        ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+        ctx.setQualifiedUser(Auth.ADMIN_USER);

Review Comment:
   Use user who create this job?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to