morningman commented on code in PR #40558:
URL: https://github.com/apache/doris/pull/40558#discussion_r1759653217


##########
fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java:
##########
@@ -270,6 +279,50 @@ private boolean rollback(long taskId) {
         return InsertOverwriteUtil.dropPartitions(olapTable, 
task.getTempPartitionNames());
     }
 
+    /**
+     * If the current table id has a running insert overwrite, throw an 
exception.
+     * If not, record it in runningTables
+     *
+     * @param dbId Run the dbId for insert overwrite
+     * @param tableId Run the tableId for insert overwrite
+     */
+    public void recordRunningTableOrException(long dbId, long tableId) {
+        runningLock.writeLock().lock();
+        try {
+            if (runningTables.containsKey(dbId) && 
runningTables.get(dbId).contains(tableId)) {
+                throw new AnalysisException(
+                        String.format("insert overwrite is running on db: %s, 
table: %s", dbId, tableId));

Review Comment:
   ```suggestion
                           String.format("Not allowed running Insert Overwrite 
on same table: %s.%s", dbId, tableId));
                          
   ```
   
   And I suggest to use db/table name in error msg



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());

Review Comment:
   ```suggestion
                       LOG.info("insert overwrite is cancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
addTempPartitions, queryId: {}",

Review Comment:
   ```suggestion
                       LOG.info("insert overwrite is cancelled before 
addTempPartitions, queryId: {}",
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
addTempPartitions, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    // not need deal temp partition
+                    insertOverwriteManager.taskSuccess(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.addTempPartitions(targetTable, 
partitionNames, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before insertInto, 
queryId: {}", ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 insertInto(ctx, executor, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
replacePartition, queryId: {}",

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
addTempPartitions, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    // not need deal temp partition
+                    insertOverwriteManager.taskSuccess(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.addTempPartitions(targetTable, 
partitionNames, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before insertInto, 
queryId: {}", ctx.getQueryIdentifier());

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
addTempPartitions, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    // not need deal temp partition
+                    insertOverwriteManager.taskSuccess(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.addTempPartitions(targetTable, 
partitionNames, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before insertInto, 
queryId: {}", ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 insertInto(ctx, executor, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
replacePartition, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.replacePartition(targetTable, 
partitionNames, tempPartitionNames);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before taskSuccess, 
do nothing, queryId: {}",
+                            ctx.getQueryIdentifier());
+                }
+                insertOverwriteManager.taskSuccess(taskId);
             }
         } catch (Exception e) {
             LOG.warn("insert into overwrite failed with task(or group) id " + 
taskId);
             if (isAutoDetectOverwrite()) {
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
+                insertOverwriteManager.taskGroupFail(taskId);
             } else {
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
+                insertOverwriteManager.taskFail(taskId);
             }
             throw e;
         } finally {
             ConnectContext.get().setSkipAuth(false);
+            insertOverwriteManager
+                    .dropRunningRecord(targetTable.getDatabase().getId(), 
targetTable.getId());
+            isRunning = false;
+        }
+    }
+
+    /**
+     * cancel insert overwrite
+     */
+    public void cancel() {
+        this.isCancelled = true;
+    }
+
+    /**
+     * wait insert overwrite not running
+     */
+    public void waitNotRunning() {
+        long waitMaxTimeMills = 10 * 1000L;
+        long waitTime = 0;
+        while (true) {

Review Comment:
   You can use `Awaitility`, just like using it in regression test case



##########
regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy:
##########
@@ -1275,7 +1275,29 @@ class Suite implements GroovyInterceptable {
             }
             logger.info("The state of ${showTasks} is ${status}")
             Thread.sleep(1000);
-        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING'  || status == 'NULL'))
+        if (status != "SUCCESS") {
+            logger.info("status is not success")
+        }
+        Assert.assertEquals("SUCCESS", status)
+    }
+
+    void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
+        Thread.sleep(2000);
+        String showTasks = "select 
TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from 
tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
+        String status = "NULL"
+        List<List<Object>> result
+        long startTime = System.currentTimeMillis()
+        long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min

Review Comment:
   Use `Awaitility`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
 targetTable.getId());
+        isRunning = true;
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
registerTask, queryId: {}", ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
addTempPartitions, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    // not need deal temp partition
+                    insertOverwriteManager.taskSuccess(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.addTempPartitions(targetTable, 
partitionNames, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before insertInto, 
queryId: {}", ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 insertInto(ctx, executor, tempPartitionNames);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before 
replacePartition, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.replacePartition(targetTable, 
partitionNames, tempPartitionNames);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+                if (isCancelled) {
+                    LOG.info("insert overwrite isCancelled before taskSuccess, 
do nothing, queryId: {}",

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to