morningman commented on code in PR #28566: URL: https://github.com/apache/doris/pull/28566#discussion_r1430813034
########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ########## @@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) { public void run() throws JobException { try { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); - TUniqueId queryId = generateQueryId(); // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name - relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); - calculateRefreshInfo(); - Map<OlapTable, String> tableWithPartKey = Maps.newHashMap(); + this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); + List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(); + this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); + this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; - } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) { - OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); - tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol()); } - refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds); - UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand - .from(mtmv, refreshPartitionIds, tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setQueryId(queryId); - command.run(ctx, executor); + Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap(); + this.completedPartitions = Lists.newArrayList(); + int refreshPartitionNum = mtmv.getRefreshPartitionNum(); + long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size() + % refreshPartitionNum) > 0 ? 1 : 0); + for (int i = 0; i < execNum; i++) { + int start = i * refreshPartitionNum; + int end = start + refreshPartitionNum; + Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds + .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); + // need get names before exec + List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + exec(ctx, execPartitionIds, tableWithPartKey); + completedPartitions.addAll(execPartitionNames); + } } catch (Throwable e) { LOG.warn("run task failed: ", e); throw new JobException(e); } } + public void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey) + throws Exception { + TUniqueId queryId = generateQueryId(); + // if SELF_MANAGE, will not have partitionItem, so we give empty set Review Comment: ```suggestion // if not SELF_MANAGED table, will not have partitionItem, so we give empty set ``` ########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ########## @@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) { public void run() throws JobException { try { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); - TUniqueId queryId = generateQueryId(); // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name - relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); - calculateRefreshInfo(); - Map<OlapTable, String> tableWithPartKey = Maps.newHashMap(); + this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); + List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(); + this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); + this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; - } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) { - OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); - tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol()); } - refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds); - UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand - .from(mtmv, refreshPartitionIds, tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setQueryId(queryId); - command.run(ctx, executor); + Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap(); + this.completedPartitions = Lists.newArrayList(); + int refreshPartitionNum = mtmv.getRefreshPartitionNum(); + long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size() + % refreshPartitionNum) > 0 ? 1 : 0); + for (int i = 0; i < execNum; i++) { + int start = i * refreshPartitionNum; + int end = start + refreshPartitionNum; + Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds + .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); + // need get names before exec + List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + exec(ctx, execPartitionIds, tableWithPartKey); + completedPartitions.addAll(execPartitionNames); + } } catch (Throwable e) { LOG.warn("run task failed: ", e); throw new JobException(e); } } + public void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey) Review Comment: ```suggestion private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org