kangkaisen commented on a change in pull request #2781: Batch Operate Rollup 
table in Doris #2671
URL: https://github.com/apache/incubator-doris/pull/2781#discussion_r367961484
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
 ##########
 @@ -590,15 +767,130 @@ protected void runAfterCatalogReady() {
         runAlterJobV2();
     }
 
+    private synchronized Map<Long, AlterJobV2> getAlterJobsCopy () {
+        return new HashMap<>(alterJobsV2);
+    }
+
+    private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) {
+        Set<Long> runningJobIdSet = 
tableRunningJobMap.get(rollupJobV2.getTableId());
+        if (runningJobIdSet != null) {
+            runningJobIdSet.remove(rollupJobV2.getJobId());
+            if (runningJobIdSet.size() == 0) {
+                tableRunningJobMap.remove(rollupJobV2.getTableId());
+            }
+        }
+    }
+
+    private void changeTableStatus(long dbId, long tableId, OlapTableState 
olapTableState) {
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        db.readLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableId);
+            if (tbl == null || tbl.getState() == olapTableState) {
+                return;
+            }
+        } finally {
+            db.readUnlock();
+        }
+
+        db.writeLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableId);
+            tbl.setState(olapTableState);
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    // replay the alter job v2
+    public void replayAlterJobV2(AlterJobV2 alterJob) {
+        super.replayAlterJobV2(alterJob);
+        if (!alterJob.isDone()) {
+            addAlterJobV2ToTableNotFinalStateJobMap(alterJob);
+            changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), 
OlapTableState.ROLLUP);
+        } else {
+            int ret = removeAlterJobV2FromTableNotFinalStateJobMap(alterJob);
+            if (ret == 0) {
+                changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), 
OlapTableState.NORMAL);
+            }
+        }
+    }
+
+    /**
+     *
+     * @param rollupJobV2
+     * @return
+     *  true,parent job is finish or current job don't have parent job;
+     *  false, parent job is cancelled or current job hasn't finished
+     */
+    private boolean checkRollupJobDependency(RollupJobV2 rollupJobV2) {
+        RollupJobV2 parentJobV2 = (RollupJobV2) 
alterJobsV2.get(rollupJobV2.getParentRollupJobId());
+        if (parentJobV2 != null) {
+            // check whether current rollup job's parent rollup job is finished
+            if (parentJobV2.getJobState() == AlterJobV2.JobState.CANCELLED) {
+                rollupJobV2.cancel(String.format("rollup %s 's base rollup %s 
is cancelled", rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId()));
+                return false;
+            }
+            if (parentJobV2.getJobState() != AlterJobV2.JobState.FINISHED) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void runAlterJobWithConcurrencyLimit(RollupJobV2 rollupJobV2) {
+        Set<Long> tableRunningJobSet = 
tableRunningJobMap.get(rollupJobV2.getTableId());
+        if (tableRunningJobSet == null) {
+            tableRunningJobSet = new HashSet<>();
+            tableRunningJobMap.put(rollupJobV2.getTableId(), 
tableRunningJobSet);
+        }
+
+        // current job is already in running
+        if (tableRunningJobSet.contains(rollupJobV2.getJobId())) {
+            rollupJobV2.run();
+        } else if (tableRunningJobSet.size() < 
Config.max_running_rollup_job_num) {
+            // create rollup index for job in case 1,see processBatchAddRollup
+            if (rollupJobV2.getPartitionIdToRollupIndex().size() == 0) {
+                Catalog catalog = Catalog.getCurrentCatalog();
+                Database db = Catalog.getInstance().getDb(rollupJobV2.dbId);
+                OlapTable olapTable = (OlapTable) 
db.getTable(rollupJobV2.getTableId());
+                if (!createMaterializedIndexForRollupJobV2(olapTable, db, 
catalog, rollupJobV2)) {
+                    rollupJobV2.cancel(String.format("rollup %s 's base rollup 
%s is cancelled due to create mv index failed, jobId = %s",
+                            rollupJobV2.getRollupIndexId(), 
rollupJobV2.getBaseIndexId(), rollupJobV2.getJobId()));
+                }
+                // for job in case 2 in log replay,they are in pending but not 
create index, can't transfer to WAITING_TXN,so first need transfer to pending
+                rollupJobV2.setJobState(AlterJobV2.JobState.PENDING);
+                catalog.getEditLog().logAlterJob(rollupJobV2);
+            }
+            // add current job to running queue
+            tableRunningJobSet.add(rollupJobV2.getJobId());
+            rollupJobV2.run();
+        }
+    }
+
     private void runAlterJobV2() {
-        Iterator<Map.Entry<Long, AlterJobV2>> iter = 
alterJobsV2.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry<Long, AlterJobV2> entry = iter.next();
-            AlterJobV2 alterJob = entry.getValue();
+        replayJobDependencyAfterCatalogReady();
+        Iterator<Map.Entry<Long, AlterJobV2>> iterator = 
getAlterJobsCopy().entrySet().iterator();
 
 Review comment:
   I think there is no concurrent issue. you could directly use `alterJobsV2`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to