This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e35e7468261e7253b288d4cd6fbff2878c79018
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Sun Jan 22 23:11:32 2023 +0800

    [Enhancement](export) cancel all running coordinators when execute 
cancel-export statement. (#15801)
---
 .../src/main/java/org/apache/doris/load/ExportJob.java  | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 7f681548df..7c68e24689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -66,6 +66,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -655,7 +656,21 @@ public class ExportJob implements Writable {
             failMsg = new ExportFailMsg(type, msg);
         }
         if (updateState(ExportJob.JobState.CANCELLED, false)) {
-            releaseSnapshotPaths();
+            // cancel all running coordinators, so that the scheduler's worker 
thread will be released
+            for (Coordinator coordinator : coordList) {
+                Coordinator registeredCoordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId());
+                if (registeredCoordinator != null) {
+                    registeredCoordinator.cancel();
+                }
+            }
+
+            // release snapshot
+            Status releaseSnapshotStatus = releaseSnapshotPaths();
+            if (!releaseSnapshotStatus.ok()) {
+                // snapshot will be removed by GC thread on BE, finally.
+                LOG.warn("failed to release snapshot for export job: {}. err: 
{}", id,
+                        releaseSnapshotStatus.getErrorMsg());
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to