morningman commented on code in PR #40558: URL: https://github.com/apache/doris/pull/40558#discussion_r1759653217
########## fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java: ########## @@ -270,6 +279,50 @@ private boolean rollback(long taskId) { return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); } + /** + * If the current table id has a running insert overwrite, throw an exception. + * If not, record it in runningTables + * + * @param dbId Run the dbId for insert overwrite + * @param tableId Run the tableId for insert overwrite + */ + public void recordRunningTableOrException(long dbId, long tableId) { + runningLock.writeLock().lock(); + try { + if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { + throw new AnalysisException( + String.format("insert overwrite is running on db: %s, table: %s", dbId, tableId)); Review Comment: ```suggestion String.format("Not allowed running Insert Overwrite on same table: %s.%s", dbId, tableId)); ``` And I suggest to use db/table name in error msg ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); Review Comment: ```suggestion LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", Review Comment: ```suggestion LOG.info("insert overwrite is cancelled before addTempPartitions, queryId: {}", ``` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } insertInto(ctx, executor, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before replacePartition, queryId: {}", Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } insertInto(ctx, executor, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before replacePartition, queryId: {}", + ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before taskSuccess, do nothing, queryId: {}", + ctx.getQueryIdentifier()); + } + insertOverwriteManager.taskSuccess(taskId); } } catch (Exception e) { LOG.warn("insert into overwrite failed with task(or group) id " + taskId); if (isAutoDetectOverwrite()) { - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); + insertOverwriteManager.taskGroupFail(taskId); } else { - Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + insertOverwriteManager.taskFail(taskId); } throw e; } finally { ConnectContext.get().setSkipAuth(false); + insertOverwriteManager + .dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = false; + } + } + + /** + * cancel insert overwrite + */ + public void cancel() { + this.isCancelled = true; + } + + /** + * wait insert overwrite not running + */ + public void waitNotRunning() { + long waitMaxTimeMills = 10 * 1000L; + long waitTime = 0; + while (true) { Review Comment: You can use `Awaitility`, just like using it in regression test case ########## regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy: ########## @@ -1275,7 +1275,29 @@ class Suite implements GroovyInterceptable { } logger.info("The state of ${showTasks} is ${status}") Thread.sleep(1000); - } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + if (status != "SUCCESS") { + logger.info("status is not success") + } + Assert.assertEquals("SUCCESS", status) + } + + void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) { + Thread.sleep(2000); + String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC" + String status = "NULL" + List<List<Object>> result + long startTime = System.currentTimeMillis() + long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min Review Comment: Use `Awaitility` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } insertInto(ctx, executor, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before replacePartition, queryId: {}", + ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before taskSuccess, do nothing, queryId: {}", Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org