kangkaisen commented on a change in pull request #2781: Batch Operate Rollup table in Doris #2671 URL: https://github.com/apache/incubator-doris/pull/2781#discussion_r367961484
########## File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java ########## @@ -590,15 +767,130 @@ protected void runAfterCatalogReady() { runAlterJobV2(); } + private synchronized Map<Long, AlterJobV2> getAlterJobsCopy () { + return new HashMap<>(alterJobsV2); + } + + private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) { + Set<Long> runningJobIdSet = tableRunningJobMap.get(rollupJobV2.getTableId()); + if (runningJobIdSet != null) { + runningJobIdSet.remove(rollupJobV2.getJobId()); + if (runningJobIdSet.size() == 0) { + tableRunningJobMap.remove(rollupJobV2.getTableId()); + } + } + } + + private void changeTableStatus(long dbId, long tableId, OlapTableState olapTableState) { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + db.readLock(); + try { + OlapTable tbl = (OlapTable) db.getTable(tableId); + if (tbl == null || tbl.getState() == olapTableState) { + return; + } + } finally { + db.readUnlock(); + } + + db.writeLock(); + try { + OlapTable tbl = (OlapTable) db.getTable(tableId); + tbl.setState(olapTableState); + } finally { + db.writeUnlock(); + } + } + + // replay the alter job v2 + public void replayAlterJobV2(AlterJobV2 alterJob) { + super.replayAlterJobV2(alterJob); + if (!alterJob.isDone()) { + addAlterJobV2ToTableNotFinalStateJobMap(alterJob); + changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.ROLLUP); + } else { + int ret = removeAlterJobV2FromTableNotFinalStateJobMap(alterJob); + if (ret == 0) { + changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); + } + } + } + + /** + * + * @param rollupJobV2 + * @return + * true,parent job is finish or current job don't have parent job; + * false, parent job is cancelled or current job hasn't finished + */ + private boolean checkRollupJobDependency(RollupJobV2 rollupJobV2) { + RollupJobV2 parentJobV2 = (RollupJobV2) alterJobsV2.get(rollupJobV2.getParentRollupJobId()); + if (parentJobV2 != null) { + // check whether current rollup job's parent rollup job is finished + if (parentJobV2.getJobState() == AlterJobV2.JobState.CANCELLED) { + rollupJobV2.cancel(String.format("rollup %s 's base rollup %s is cancelled", rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId())); + return false; + } + if (parentJobV2.getJobState() != AlterJobV2.JobState.FINISHED) { + return false; + } + } + return true; + } + + private void runAlterJobWithConcurrencyLimit(RollupJobV2 rollupJobV2) { + Set<Long> tableRunningJobSet = tableRunningJobMap.get(rollupJobV2.getTableId()); + if (tableRunningJobSet == null) { + tableRunningJobSet = new HashSet<>(); + tableRunningJobMap.put(rollupJobV2.getTableId(), tableRunningJobSet); + } + + // current job is already in running + if (tableRunningJobSet.contains(rollupJobV2.getJobId())) { + rollupJobV2.run(); + } else if (tableRunningJobSet.size() < Config.max_running_rollup_job_num) { + // create rollup index for job in case 1,see processBatchAddRollup + if (rollupJobV2.getPartitionIdToRollupIndex().size() == 0) { + Catalog catalog = Catalog.getCurrentCatalog(); + Database db = Catalog.getInstance().getDb(rollupJobV2.dbId); + OlapTable olapTable = (OlapTable) db.getTable(rollupJobV2.getTableId()); + if (!createMaterializedIndexForRollupJobV2(olapTable, db, catalog, rollupJobV2)) { + rollupJobV2.cancel(String.format("rollup %s 's base rollup %s is cancelled due to create mv index failed, jobId = %s", + rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId(), rollupJobV2.getJobId())); + } + // for job in case 2 in log replay,they are in pending but not create index, can't transfer to WAITING_TXN,so first need transfer to pending + rollupJobV2.setJobState(AlterJobV2.JobState.PENDING); + catalog.getEditLog().logAlterJob(rollupJobV2); + } + // add current job to running queue + tableRunningJobSet.add(rollupJobV2.getJobId()); + rollupJobV2.run(); + } + } + private void runAlterJobV2() { - Iterator<Map.Entry<Long, AlterJobV2>> iter = alterJobsV2.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<Long, AlterJobV2> entry = iter.next(); - AlterJobV2 alterJob = entry.getValue(); + replayJobDependencyAfterCatalogReady(); + Iterator<Map.Entry<Long, AlterJobV2>> iterator = getAlterJobsCopy().entrySet().iterator(); Review comment: I think there is no concurrent issue. you could directly use `alterJobsV2`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org