morningman commented on code in PR #28566:
URL: https://github.com/apache/doris/pull/28566#discussion_r1430813034


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext 
taskContext) {
     public void run() throws JobException {
         try {
             ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
-            TUniqueId queryId = generateQueryId();
             // Every time a task is run, the relation is regenerated because 
baseTables and baseViews may change,
             // such as deleting a table and creating a view with the same name
-            relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
-            calculateRefreshInfo();
-            Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
+            this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
+            List<Long> needRefreshPartitionIds = 
calculateNeedRefreshPartitions();
+            this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
needRefreshPartitionIds);
+            this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
             if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
                 return;
-            } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
-                OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
-                tableWithPartKey.put(relatedTable, 
mtmv.getMvPartitionInfo().getRelatedCol());
             }
-            refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
refreshPartitionIds);
-            UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
-                    .from(mtmv, refreshPartitionIds, tableWithPartKey);
-            executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
-            ctx.setQueryId(queryId);
-            command.run(ctx, executor);
+            Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
+            this.completedPartitions = Lists.newArrayList();
+            int refreshPartitionNum = mtmv.getRefreshPartitionNum();
+            long execNum = (needRefreshPartitionIds.size() / 
refreshPartitionNum) + ((needRefreshPartitionIds.size()
+                    % refreshPartitionNum) > 0 ? 1 : 0);
+            for (int i = 0; i < execNum; i++) {
+                int start = i * refreshPartitionNum;
+                int end = start + refreshPartitionNum;
+                Set<Long> execPartitionIds = 
Sets.newHashSet(needRefreshPartitionIds
+                        .subList(start, end > needRefreshPartitionIds.size() ? 
needRefreshPartitionIds.size() : end));
+                // need get names before exec
+                List<String> execPartitionNames = 
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+                exec(ctx, execPartitionIds, tableWithPartKey);
+                completedPartitions.addAll(execPartitionNames);
+            }
         } catch (Throwable e) {
             LOG.warn("run task failed: ", e);
             throw new JobException(e);
         }
     }
 
+    public void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, 
Map<OlapTable, String> tableWithPartKey)
+            throws Exception {
+        TUniqueId queryId = generateQueryId();
+        // if SELF_MANAGE, will not have partitionItem, so we give empty set

Review Comment:
   ```suggestion
           // if not SELF_MANAGED table, will not have partitionItem, so we 
give empty set
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext 
taskContext) {
     public void run() throws JobException {
         try {
             ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
-            TUniqueId queryId = generateQueryId();
             // Every time a task is run, the relation is regenerated because 
baseTables and baseViews may change,
             // such as deleting a table and creating a view with the same name
-            relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
-            calculateRefreshInfo();
-            Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
+            this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
+            List<Long> needRefreshPartitionIds = 
calculateNeedRefreshPartitions();
+            this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
needRefreshPartitionIds);
+            this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
             if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
                 return;
-            } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
-                OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
-                tableWithPartKey.put(relatedTable, 
mtmv.getMvPartitionInfo().getRelatedCol());
             }
-            refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
refreshPartitionIds);
-            UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
-                    .from(mtmv, refreshPartitionIds, tableWithPartKey);
-            executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
-            ctx.setQueryId(queryId);
-            command.run(ctx, executor);
+            Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
+            this.completedPartitions = Lists.newArrayList();
+            int refreshPartitionNum = mtmv.getRefreshPartitionNum();
+            long execNum = (needRefreshPartitionIds.size() / 
refreshPartitionNum) + ((needRefreshPartitionIds.size()
+                    % refreshPartitionNum) > 0 ? 1 : 0);
+            for (int i = 0; i < execNum; i++) {
+                int start = i * refreshPartitionNum;
+                int end = start + refreshPartitionNum;
+                Set<Long> execPartitionIds = 
Sets.newHashSet(needRefreshPartitionIds
+                        .subList(start, end > needRefreshPartitionIds.size() ? 
needRefreshPartitionIds.size() : end));
+                // need get names before exec
+                List<String> execPartitionNames = 
MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
+                exec(ctx, execPartitionIds, tableWithPartKey);
+                completedPartitions.addAll(execPartitionNames);
+            }
         } catch (Throwable e) {
             LOG.warn("run task failed: ", e);
             throw new JobException(e);
         }
     }
 
+    public void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, 
Map<OlapTable, String> tableWithPartKey)

Review Comment:
   ```suggestion
       private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, 
Map<OlapTable, String> tableWithPartKey)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to