wangbo commented on a change in pull request #2781: Batch Operate Rollup table 
in Doris #2671
URL: https://github.com/apache/incubator-doris/pull/2781#discussion_r368225002
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
 ##########
 @@ -590,15 +767,130 @@ protected void runAfterCatalogReady() {
         runAlterJobV2();
     }
 
+    private synchronized Map<Long, AlterJobV2> getAlterJobsCopy () {
+        return new HashMap<>(alterJobsV2);
+    }
+
+    private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) {
+        Set<Long> runningJobIdSet = 
tableRunningJobMap.get(rollupJobV2.getTableId());
+        if (runningJobIdSet != null) {
+            runningJobIdSet.remove(rollupJobV2.getJobId());
+            if (runningJobIdSet.size() == 0) {
+                tableRunningJobMap.remove(rollupJobV2.getTableId());
+            }
+        }
+    }
+
+    private void changeTableStatus(long dbId, long tableId, OlapTableState 
olapTableState) {
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        db.readLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableId);
+            if (tbl == null || tbl.getState() == olapTableState) {
+                return;
+            }
+        } finally {
+            db.readUnlock();
+        }
+
+        db.writeLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableId);
+            tbl.setState(olapTableState);
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    // replay the alter job v2
+    public void replayAlterJobV2(AlterJobV2 alterJob) {
+        super.replayAlterJobV2(alterJob);
+        if (!alterJob.isDone()) {
+            addAlterJobV2ToTableNotFinalStateJobMap(alterJob);
+            changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), 
OlapTableState.ROLLUP);
+        } else {
+            int ret = removeAlterJobV2FromTableNotFinalStateJobMap(alterJob);
+            if (ret == 0) {
+                changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), 
OlapTableState.NORMAL);
+            }
+        }
+    }
+
+    /**
+     *
+     * @param rollupJobV2
+     * @return
+     *  true,parent job is finish or current job don't have parent job;
+     *  false, parent job is cancelled or current job hasn't finished
+     */
+    private boolean checkRollupJobDependency(RollupJobV2 rollupJobV2) {
+        RollupJobV2 parentJobV2 = (RollupJobV2) 
alterJobsV2.get(rollupJobV2.getParentRollupJobId());
+        if (parentJobV2 != null) {
+            // check whether current rollup job's parent rollup job is finished
+            if (parentJobV2.getJobState() == AlterJobV2.JobState.CANCELLED) {
+                rollupJobV2.cancel(String.format("rollup %s 's base rollup %s 
is cancelled", rollupJobV2.getRollupIndexId(), rollupJobV2.getBaseIndexId()));
+                return false;
+            }
+            if (parentJobV2.getJobState() != AlterJobV2.JobState.FINISHED) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void runAlterJobWithConcurrencyLimit(RollupJobV2 rollupJobV2) {
+        Set<Long> tableRunningJobSet = 
tableRunningJobMap.get(rollupJobV2.getTableId());
+        if (tableRunningJobSet == null) {
+            tableRunningJobSet = new HashSet<>();
+            tableRunningJobMap.put(rollupJobV2.getTableId(), 
tableRunningJobSet);
+        }
+
+        // current job is already in running
+        if (tableRunningJobSet.contains(rollupJobV2.getJobId())) {
+            rollupJobV2.run();
+        } else if (tableRunningJobSet.size() < 
Config.max_running_rollup_job_num) {
+            // create rollup index for job in case 1,see processBatchAddRollup
+            if (rollupJobV2.getPartitionIdToRollupIndex().size() == 0) {
+                Catalog catalog = Catalog.getCurrentCatalog();
+                Database db = Catalog.getInstance().getDb(rollupJobV2.dbId);
+                OlapTable olapTable = (OlapTable) 
db.getTable(rollupJobV2.getTableId());
+                if (!createMaterializedIndexForRollupJobV2(olapTable, db, 
catalog, rollupJobV2)) {
+                    rollupJobV2.cancel(String.format("rollup %s 's base rollup 
%s is cancelled due to create mv index failed, jobId = %s",
+                            rollupJobV2.getRollupIndexId(), 
rollupJobV2.getBaseIndexId(), rollupJobV2.getJobId()));
+                }
+                // for job in case 2 in log replay,they are in pending but not 
create index, can't transfer to WAITING_TXN,so first need transfer to pending
+                rollupJobV2.setJobState(AlterJobV2.JobState.PENDING);
+                catalog.getEditLog().logAlterJob(rollupJobV2);
+            }
+            // add current job to running queue
+            tableRunningJobSet.add(rollupJobV2.getJobId());
+            rollupJobV2.run();
+        }
+    }
+
     private void runAlterJobV2() {
-        Iterator<Map.Entry<Long, AlterJobV2>> iter = 
alterJobsV2.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry<Long, AlterJobV2> entry = iter.next();
-            AlterJobV2 alterJob = entry.getValue();
+        replayJobDependencyAfterCatalogReady();
 
 Review comment:
   In this implementation,when fe restart,the job dependncy is lost,need to 
rebuild before running, set every job's parent job;
   But if use "whether base rollup can be found in catalog" to judge whether 
parent rollup is ready is ok,so this method can be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to