This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 17dec13b96e branch-3.0: [fix](Export) Fix the problem of exporting stuck #44944 (#45093) 17dec13b96e is described below commit 17dec13b96ef11d140c1c824938fbaa6f8ea0c39 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Dec 5 23:30:09 2024 -0800 branch-3.0: [fix](Export) Fix the problem of exporting stuck #44944 (#45093) Cherry-picked from #44944 Co-authored-by: Tiewei Fang <fangtie...@selectdb.com> --- .../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------------ .../doris/scheduler/disruptor/TaskDisruptor.java | 11 +++++--- .../scheduler/manager/TransientTaskManager.java | 4 +-- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 49ebbfe7dcd..238b40a7d41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -106,26 +106,24 @@ public class ExportMgr { } } unprotectAddJob(job); - // delete existing files - if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { - if (job.getBrokerDesc() == null) { - throw new AnalysisException("Local file system does not support delete existing files"); - } - String fullPath = job.getExportPath(); - BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), - job.getBrokerDesc()); - } Env.getCurrentEnv().getEditLog().logExportCreate(job); - // ATTN: Must add task after edit log, otherwise the job may finish before adding job. - job.getCopiedTaskExecutors().forEach(executor -> { - Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); - }); - LOG.info("add export job. {}", job); - } finally { writeUnlock(); } - + // delete existing files + if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { + if (job.getBrokerDesc() == null) { + throw new AnalysisException("Local file system does not support delete existing files"); + } + String fullPath = job.getExportPath(); + BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), + job.getBrokerDesc()); + } + // ATTN: Must add task after edit log, otherwise the job may finish before adding job. + for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) { + Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(job.getCopiedTaskExecutors().get(i)); + } + LOG.info("add export job. {}", job); } public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 345b31d6bc2..8144ca22ea2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -20,6 +20,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.common.Config; import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; +import org.apache.doris.scheduler.exception.JobException; import com.lmax.disruptor.EventTranslatorThreeArg; import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; @@ -119,15 +120,17 @@ public class TaskDisruptor implements Closeable { * * @param taskId task id */ - public void tryPublishTask(Long taskId) { + public void tryPublishTask(Long taskId) throws JobException { if (isClosed) { log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId); return; } - try { + // We reserve two slots in the ring buffer + // to prevent it from becoming stuck due to competition between producers and consumers. + if (disruptor.getRingBuffer().hasAvailableCapacity(2)) { disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK); - } catch (Exception e) { - log.warn("tryPublish failed, taskId: {}", taskId, e); + } else { + throw new JobException("There is not enough available capacity in the RingBuffer."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index 7461399c8eb..de501d3e0c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -21,7 +21,6 @@ import org.apache.doris.scheduler.disruptor.TaskDisruptor; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; -import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,7 +40,6 @@ public class TransientTaskManager { * disruptor is used to handle task * disruptor will start a thread pool to handle task */ - @Setter private TaskDisruptor disruptor; public TransientTaskManager() { @@ -56,7 +54,7 @@ public class TransientTaskManager { return taskExecutorMap.get(taskId); } - public Long addMemoryTask(TransientTaskExecutor executor) { + public Long addMemoryTask(TransientTaskExecutor executor) throws JobException { Long taskId = executor.getId(); taskExecutorMap.put(taskId, executor); disruptor.tryPublishTask(taskId); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org