wangbo commented on a change in pull request #2781: Batch Operate Rollup table in Doris #2671 URL: https://github.com/apache/incubator-doris/pull/2781#discussion_r368225002
########## File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java ########## @@ -590,15 +767,130 @@ protected void runAfterCatalogReady() { runAlterJobV2(); } + private synchronized Map<Long, AlterJobV2> getAlterJobsCopy () { + return new HashMap<>(alterJobsV2); + } + + private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) { + Set<Long> runningJobIdSet = tableRunningJobMap.get(rollupJobV2.getTableId()); + if (runningJobIdSet != null) { + runningJobIdSet.remove(rollupJobV2.getJobId()); + if (runningJobIdSet.size() == 0) { + tableRunningJobMap.remove(rollupJobV2.getTableId()); + } + } + } + + private void changeTableStatus(long dbId, long tableId, OlapTableState olapTableState) { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + db.readLock(); + try { + OlapTable tbl = (OlapTable) db.getTable(tableId); + if (tbl == null || tbl.getState() == olapTableState) { + return; + } + } finally { + db.readUnlock(); + } + + db.writeLock(); + try { + OlapTable tbl = (OlapTable) db.getTable(tableId); + tbl.setState(olapTableState); + } finally { + db.writeUnlock(); + } + } + + // replay the alter job v2 + public void replayAlterJobV2(AlterJobV2 alterJob) { + super.replayAlterJobV2(alterJob); + if (!alterJob.isDone()) { + addAlterJobV2ToTableNotFinalStateJobMap(alterJob); + changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.ROLLUP); + } else { + int ret = removeAlterJobV2FromTableNotFinalStateJobMap(alterJob); + if (ret == 0) { + changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); + } + } + } + + /** + * + * @param rollupJobV2 + * @return + * true,parent job is finish or current job don't have parent job; + * false, parent job is cancelled or current job hasn't finished + */ + private boolean checkRollupJobDependency(RollupJobV2 rollupJobV2) { + RollupJobV2 parentJobV2 = (RollupJobV2) alterJobsV2.get(rollupJobV2.getParentRollupJobId()); + if (parentJobV2 != null) { + // check whether current rollup job's parent rollup job is finished + if (parentJobV2.getJobState() == AlterJobV2.JobState.CANCELLED) { + rollupJobV2.cancel(String.format("rollup %s 's base rollup %s is cancelled", rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId())); + return false; + } + if (parentJobV2.getJobState() != AlterJobV2.JobState.FINISHED) { + return false; + } + } + return true; + } + + private void runAlterJobWithConcurrencyLimit(RollupJobV2 rollupJobV2) { + Set<Long> tableRunningJobSet = tableRunningJobMap.get(rollupJobV2.getTableId()); + if (tableRunningJobSet == null) { + tableRunningJobSet = new HashSet<>(); + tableRunningJobMap.put(rollupJobV2.getTableId(), tableRunningJobSet); + } + + // current job is already in running + if (tableRunningJobSet.contains(rollupJobV2.getJobId())) { + rollupJobV2.run(); + } else if (tableRunningJobSet.size() < Config.max_running_rollup_job_num) { + // create rollup index for job in case 1,see processBatchAddRollup + if (rollupJobV2.getPartitionIdToRollupIndex().size() == 0) { + Catalog catalog = Catalog.getCurrentCatalog(); + Database db = Catalog.getInstance().getDb(rollupJobV2.dbId); + OlapTable olapTable = (OlapTable) db.getTable(rollupJobV2.getTableId()); + if (!createMaterializedIndexForRollupJobV2(olapTable, db, catalog, rollupJobV2)) { + rollupJobV2.cancel(String.format("rollup %s 's base rollup %s is cancelled due to create mv index failed, jobId = %s", + rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId(), rollupJobV2.getJobId())); + } + // for job in case 2 in log replay,they are in pending but not create index, can't transfer to WAITING_TXN,so first need transfer to pending + rollupJobV2.setJobState(AlterJobV2.JobState.PENDING); + catalog.getEditLog().logAlterJob(rollupJobV2); + } + // add current job to running queue + tableRunningJobSet.add(rollupJobV2.getJobId()); + rollupJobV2.run(); + } + } + private void runAlterJobV2() { - Iterator<Map.Entry<Long, AlterJobV2>> iter = alterJobsV2.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<Long, AlterJobV2> entry = iter.next(); - AlterJobV2 alterJob = entry.getValue(); + replayJobDependencyAfterCatalogReady(); Review comment: In this implementation,when fe restart,the job dependncy is lost,need to rebuild before running, set every job's parent job; But if use "whether base rollup can be found in catalog" to judge whether parent rollup is ready is ok,so this method can be removed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org