kangkaisen commented on a change in pull request #2781: Batch Operate Rollup table in Doris #2671 URL: https://github.com/apache/incubator-doris/pull/2781#discussion_r367962969
########## File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java ########## @@ -85,6 +90,71 @@ public MaterializedViewHandler() { super("materialized view"); } + + // for batch submit rollup job, tableId -> jobId + // keep table's not final state job size. The job size determine's table's state, = 0 means table is normal,otherwrise is rollup + private Map<Long, Set<Long>> tableNotFinalStateJobMap = new ConcurrentHashMap<>(); + // keep table's running job,used for concurrency limit + private Map<Long, Set<Long>> tableRunningJobMap = new ConcurrentHashMap<>(); + // make sure that job's dependency just init once + private volatile boolean hasBuildJobDependency = false; + + // set job' parent job after CatalogReady + private void replayJobDependencyAfterCatalogReady() { + if (!hasBuildJobDependency) { + Map<Long, Long> rollupIndexIdJobIdMap = new HashMap<>(); + Iterator<Map.Entry<Long, AlterJobV2>> first =alterJobsV2.entrySet().iterator(); + while (first.hasNext()) { + Map.Entry<Long, AlterJobV2> entry = first.next(); + rollupIndexIdJobIdMap.put(((RollupJobV2)entry.getValue()).getRollupIndexId(), entry.getValue().getJobId()); + } + + Iterator<Map.Entry<Long, AlterJobV2>> second = alterJobsV2.entrySet().iterator(); + while (second.hasNext()) { + Map.Entry<Long, AlterJobV2> entry = second.next(); + RollupJobV2 rollupJobV2 = (RollupJobV2) entry.getValue(); + Long baseJobId = rollupIndexIdJobIdMap.get(rollupJobV2.getBaseIndexId()); + ((RollupJobV2)entry.getValue()).setParentRollupJobId(baseJobId == null ? 0 : baseJobId); + } + hasBuildJobDependency = true; + } + } + + protected synchronized void addAlterJobV2(AlterJobV2 alterJob) { + super.addAlterJobV2(alterJob); + addAlterJobV2ToTableNotFinalStateJobMap(alterJob); + } + + private void addAlterJobV2ToTableNotFinalStateJobMap(AlterJobV2 alterJobV2) { + if (alterJobV2.isDone()) { + LOG.warn("try to add a final job({}) to a unfinal set", alterJobV2.getJobId()); + return; + } + Long tableId = alterJobV2.getTableId(); + Long jobId = alterJobV2.getJobId(); + Set<Long> tableNotFinalStateJobIdSet = tableNotFinalStateJobMap.get(tableId); + if (tableNotFinalStateJobIdSet == null) { + tableNotFinalStateJobIdSet = new HashSet<>(); + tableNotFinalStateJobMap.put(tableId, tableNotFinalStateJobIdSet); + } + tableNotFinalStateJobIdSet.add(jobId); + } + + private int removeAlterJobV2FromTableNotFinalStateJobMap(AlterJobV2 alterJobV2) { Review comment: change return type to bool? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org