This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 867ef1570af branch-2.1: [fix](Export) Fix the issue where the show export status stays stuck on EXPORTING. #47974 (#48060) 867ef1570af is described below commit 867ef1570af3dcbe5ab069dcd2c617661030ed39 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Feb 19 18:10:32 2025 +0800 branch-2.1: [fix](Export) Fix the issue where the show export status stays stuck on EXPORTING. #47974 (#48060) Cherry-picked from #47974 Co-authored-by: Tiewei Fang <fangtie...@selectdb.com> --- .../src/main/java/org/apache/doris/load/ExportJob.java | 5 ++++- .../org/apache/doris/scheduler/disruptor/TaskHandler.java | 12 ++++++------ .../apache/doris/scheduler/manager/TransientTaskManager.java | 7 +++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 852c3ff94fe..0c4356336b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -715,7 +715,10 @@ public class ExportJob implements Writable { LOG.info("cancel export job {}", id); } - private void exportExportJob() { + private void exportExportJob() throws JobException { + if (getState() == ExportJobState.CANCELLED || getState() == ExportJobState.FINISHED) { + throw new JobException("export job has been {}, can not be update to `EXPORTING` state", getState()); + } // The first exportTaskExecutor will set state to EXPORTING, // other exportTaskExecutors do not need to set up state. if (getState() == ExportJobState.EXPORTING) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 193f8ece9f7..3be6102a714 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -23,7 +23,8 @@ import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.WorkHandler; -import lombok.extern.log4j.Log4j2; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * This class represents a work handler for processing event tasks consumed by a Disruptor. @@ -32,9 +33,8 @@ import lombok.extern.log4j.Log4j2; * If the event job execution fails, the work handler logs an error message and pauses the event job. * The work handler also handles system events by scheduling batch scheduler tasks. */ -@Log4j2 public class TaskHandler implements WorkHandler<TaskEvent> { - + private static final Logger LOG = LogManager.getLogger(TaskHandler.class); /** * Processes an event task by retrieving the associated event job and executing it if it is running. @@ -50,7 +50,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { onTransientTaskHandle(event); break; default: - log.warn("unknown task type: {}", event.getTaskType()); + LOG.warn("unknown task type: {}", event.getTaskType()); break; } } @@ -60,14 +60,14 @@ public class TaskHandler implements WorkHandler<TaskEvent> { TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager(); TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId); if (taskExecutor == null) { - log.info("Memory task executor is null, task id: {}", taskId); + LOG.info("Memory task executor is null, task id: {}", taskId); return; } try { taskExecutor.execute(); } catch (JobException e) { - log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage()); + LOG.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage()); } finally { transientTaskManager.removeMemoryTask(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index de501d3e0c2..5c62caede9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -63,10 +63,9 @@ public class TransientTaskManager { } public void cancelMemoryTask(Long taskId) throws JobException { - try { - taskExecutorMap.get(taskId).cancel(); - } finally { - removeMemoryTask(taskId); + TransientTaskExecutor transientTaskExecutor = taskExecutorMap.get(taskId); + if (transientTaskExecutor != null) { + transientTaskExecutor.cancel(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org