This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de2ce48b992 [fix](Export) Fix the problem of exporting stuck (#44944)
de2ce48b992 is described below

commit de2ce48b992d12237f2d3aeb6b0461f6770e16c3
Author: Tiewei Fang <fangtie...@selectdb.com>
AuthorDate: Fri Dec 6 13:02:58 2024 +0800

    [fix](Export) Fix the problem of exporting stuck (#44944)
    
    ### What problem does this PR solve?
    Problem Summary:
    
    The `disruptor` will be witing when the ringbuffer does not have enough
    capacity. At the same time, `addExportJobAndRegisterTask` will not
    release the lock of `ExportMgr`. This prevents other methods from
    obtaining the lock.
---
 .../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------------
 .../doris/scheduler/disruptor/TaskDisruptor.java   | 11 +++++---
 .../scheduler/manager/TransientTaskManager.java    |  4 +--
 3 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index eddd5fb27ee..94ae436ee6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -108,26 +108,24 @@ public class ExportMgr {
                 }
             }
             unprotectAddJob(job);
-            // delete existing files
-            if (Config.enable_delete_existing_files && 
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
-                if (job.getBrokerDesc() == null) {
-                    throw new AnalysisException("Local file system does not 
support delete existing files");
-                }
-                String fullPath = job.getExportPath();
-                BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
-                        job.getBrokerDesc());
-            }
             Env.getCurrentEnv().getEditLog().logExportCreate(job);
-            // ATTN: Must add task after edit log, otherwise the job may 
finish before adding job.
-            job.getCopiedTaskExecutors().forEach(executor -> {
-                
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
-            });
-            LOG.info("add export job. {}", job);
-
         } finally {
             writeUnlock();
         }
-
+        // delete existing files
+        if (Config.enable_delete_existing_files && 
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
+            if (job.getBrokerDesc() == null) {
+                throw new AnalysisException("Local file system does not 
support delete existing files");
+            }
+            String fullPath = job.getExportPath();
+            BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
+                    job.getBrokerDesc());
+        }
+        // ATTN: Must add task after edit log, otherwise the job may finish 
before adding job.
+        for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
+            
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(job.getCopiedTaskExecutors().get(i));
+        }
+        LOG.info("add export job. {}", job);
     }
 
     public void cancelExportJob(CancelExportStmt stmt) throws DdlException, 
AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 345b31d6bc2..8144ca22ea2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -20,6 +20,7 @@ package org.apache.doris.scheduler.disruptor;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.scheduler.constants.TaskType;
+import org.apache.doris.scheduler.exception.JobException;
 
 import com.lmax.disruptor.EventTranslatorThreeArg;
 import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
@@ -119,15 +120,17 @@ public class TaskDisruptor implements Closeable {
      *
      * @param taskId task id
      */
-    public void tryPublishTask(Long taskId) {
+    public void tryPublishTask(Long taskId) throws JobException {
         if (isClosed) {
             log.info("tryPublish failed, disruptor is closed, taskId: {}", 
taskId);
             return;
         }
-        try {
+        // We reserve two slots in the ring buffer
+        // to prevent it from becoming stuck due to competition between 
producers and consumers.
+        if (disruptor.getRingBuffer().hasAvailableCapacity(2)) {
             disruptor.publishEvent(TRANSLATOR, taskId, 0L, 
TaskType.TRANSIENT_TASK);
-        } catch (Exception e) {
-            log.warn("tryPublish failed, taskId: {}", taskId, e);
+        } else {
+            throw new JobException("There is not enough available capacity in 
the RingBuffer.");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 7461399c8eb..de501d3e0c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -21,7 +21,6 @@ import org.apache.doris.scheduler.disruptor.TaskDisruptor;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
 
-import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -41,7 +40,6 @@ public class TransientTaskManager {
      * disruptor is used to handle task
      * disruptor will start a thread pool to handle task
      */
-    @Setter
     private TaskDisruptor disruptor;
 
     public TransientTaskManager() {
@@ -56,7 +54,7 @@ public class TransientTaskManager {
         return taskExecutorMap.get(taskId);
     }
 
-    public Long addMemoryTask(TransientTaskExecutor executor) {
+    public Long addMemoryTask(TransientTaskExecutor executor) throws 
JobException {
         Long taskId = executor.getId();
         taskExecutorMap.put(taskId, executor);
         disruptor.tryPublishTask(taskId);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to