This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f4bff84dc5f [enhance](mtmv)When drop MTMV, no longer wait for task 
cancel to complete (#45995)
f4bff84dc5f is described below

commit f4bff84dc5f5ac54e199f10670bd8e90b898cee2
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Thu Dec 26 15:25:12 2024 +0800

    [enhance](mtmv)When drop MTMV, no longer wait for task cancel to complete 
(#45995)
    
    ### What problem does this PR solve?
    
    problem:
    - when drop db, will hold write lock of catalog, and drop all MTMV
    - when drop MTMV, will drop Job,
    - when drop Job, will cancel all tasks in this Job
    - when cancel task, if task is running insert overwrite, will wait for a
    long time
    
    fix:
    when drop job, not wait task cacncelled complete
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
---
 .../src/main/java/org/apache/doris/job/base/AbstractJob.java      | 8 ++++----
 fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java       | 2 +-
 .../java/org/apache/doris/job/executor/DispatchTaskHandler.java   | 2 +-
 .../java/org/apache/doris/job/extensions/insert/InsertJob.java    | 4 ++--
 .../java/org/apache/doris/job/extensions/insert/InsertTask.java   | 2 +-
 .../main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java  | 4 ++--
 .../src/main/java/org/apache/doris/job/task/AbstractTask.java     | 8 ++++----
 fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java      | 4 +++-
 fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java    | 8 ++++++--
 9 files changed, 24 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 906b86494fb..b6f62f5121b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -149,12 +149,12 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     private Lock createTaskLock = new ReentrantLock();
 
     @Override
-    public void cancelAllTasks() throws JobException {
+    public void cancelAllTasks(boolean needWaitCancelComplete) throws 
JobException {
         if (CollectionUtils.isEmpty(runningTasks)) {
             return;
         }
         for (T task : runningTasks) {
-            task.cancel();
+            task.cancel(needWaitCancelComplete);
             canceledTaskCount.incrementAndGet();
         }
         runningTasks = new CopyOnWriteArrayList<>();
@@ -184,7 +184,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
             throw new JobException("no running task");
         }
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
-                .orElseThrow(() -> new JobException("Not found task id: " + 
taskId)).cancel();
+                .orElseThrow(() -> new JobException("Not found task id: " + 
taskId)).cancel(true);
         runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
         canceledTaskCount.incrementAndGet();
         if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
@@ -292,7 +292,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
             this.finishTimeMs = System.currentTimeMillis();
         }
         if (JobStatus.PAUSED.equals(newJobStatus) || 
JobStatus.STOPPED.equals(newJobStatus)) {
-            cancelAllTasks();
+            cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : 
true);
         }
         jobStatus = newJobStatus;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index a7e75554c71..69d1e5e55fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -101,7 +101,7 @@ public interface Job<T extends AbstractTask, C> {
      * Cancels all running tasks of this job.
      * @throws JobException If cancelling a running task fails.
      */
-    void cancelAllTasks() throws JobException;
+    void cancelAllTasks(boolean needWaitCancelComplete) throws JobException;
 
     /**
      * register job
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index b8f726c4a0c..56222fd3e1f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -66,7 +66,7 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 JobType jobType = event.getJob().getJobType();
                 for (AbstractTask task : tasks) {
                     if (!disruptorMap.get(jobType).addTask(task)) {
-                        task.cancel();
+                        task.cancel(true);
                         continue;
                     }
                     log.info("dispatch timer job success, job id is {},  task 
id is {}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index f4a91498fea..f87eb29d9d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -297,12 +297,12 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
     }
 
     @Override
-    public void cancelAllTasks() throws JobException {
+    public void cancelAllTasks(boolean needWaitCancelComplete) throws 
JobException {
         try {
             if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
                 checkAuth("CANCEL LOAD");
             }
-            super.cancelAllTasks();
+            super.cancelAllTasks(needWaitCancelComplete);
             this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user 
cancel");
         } catch (DdlException e) {
             throw new JobException(e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index a577250dc86..883c4265316 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -223,7 +223,7 @@ public class InsertTask extends AbstractTask {
     }
 
     @Override
-    protected void executeCancelLogic() {
+    protected void executeCancelLogic(boolean needWaitCancelComplete) {
         if (isFinished.get() || isCanceled.get()) {
             return;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 31e6c8353e2..aa1bbe629fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -300,10 +300,10 @@ public class MTMVTask extends AbstractTask {
     }
 
     @Override
-    protected synchronized void executeCancelLogic() {
+    protected synchronized void executeCancelLogic(boolean 
needWaitCancelComplete) {
         LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
         if (executor != null) {
-            executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task 
cancelled"));
+            executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task 
cancelled"), needWaitCancelComplete);
         }
         after();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 8a230c0bd38..b356bc58d32 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -129,16 +129,16 @@ public abstract class AbstractTask implements Task {
     /**
      * Cancels the ongoing task, updating its status to {@link 
TaskStatus#CANCELED} and releasing associated resources.
      * This method encapsulates the core cancellation logic, calling the 
abstract method
-     * {@link #executeCancelLogic()} for task-specific actions.
+     * {@link #executeCancelLogic(boolean)} for task-specific actions.
      *
      * @throws JobException If an error occurs during the cancellation 
process, a new JobException is thrown wrapping
      *                      the original exception.
      */
     @Override
-    public void cancel() throws JobException {
+    public void cancel(boolean needWaitCancelComplete) throws JobException {
         try {
             status = TaskStatus.CANCELED;
-            executeCancelLogic();
+            executeCancelLogic(needWaitCancelComplete);
         } catch (Exception e) {
             log.warn("cancel task failed, job id is {}, task id is {}", jobId, 
taskId, e);
             throw new JobException(e);
@@ -153,7 +153,7 @@ public abstract class AbstractTask implements Task {
      *
      * @throws Exception Any exception that might occur during the 
cancellation process in the subclass.
      */
-    protected abstract void executeCancelLogic() throws Exception;
+    protected abstract void executeCancelLogic(boolean needWaitCancelComplete) 
throws Exception;
 
     @Override
     public void before() throws JobException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index ee205c55c31..d184f647075 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -63,8 +63,10 @@ public interface Task {
     /**
      * This method is called to cancel the execution of the task.
      * Implementations should define the necessary steps to cancel the task.
+     *
+     * @param needWaitCancelComplete Do we need to wait for the cancellation 
to be completed.
      */
-    void cancel() throws JobException;
+    void cancel(boolean needWaitCancelComplete) throws JobException;
 
     /**
      * get info for tvf `tasks`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 055a4c31e90..b155b468a7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1586,7 +1586,7 @@ public class StmtExecutor {
     }
 
     // Because this is called by other thread
-    public void cancel(Status cancelReason) {
+    public void cancel(Status cancelReason, boolean needWaitCancelComplete) {
         if (masterOpExecutor != null) {
             try {
                 masterOpExecutor.cancel();
@@ -1610,12 +1610,16 @@ public class StmtExecutor {
         if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof 
AnalyzeDBStmt) {
             Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
         }
-        if (insertOverwriteTableCommand.isPresent()) {
+        if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) 
{
             // Wait for the command to run or cancel completion
             insertOverwriteTableCommand.get().waitNotRunning();
         }
     }
 
+    public void cancel(Status cancelReason) {
+        cancel(cancelReason, true);
+    }
+
     private Optional<InsertOverwriteTableCommand> 
getInsertOverwriteTableCommand() {
         if (parsedStmt instanceof LogicalPlanAdapter) {
             LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) 
parsedStmt;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to