This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e9290340c89 [fix](export) remove export task executor in 
TransientTaskExecutor and fix concurrency issue 
(#42880)(#43051)(#43109)(#43250) (#43305)
e9290340c89 is described below

commit e9290340c899ad94c4f8c6a8964de18464321f5c
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Wed Nov 6 13:55:27 2024 +0800

    [fix](export) remove export task executor in TransientTaskExecutor and fix 
concurrency issue (#42880)(#43051)(#43109)(#43250) (#43305)
    
    cherry pick from (#42880)(#43051)(#43109)(#43250)
---
 .../java/org/apache/doris/analysis/ExportStmt.java |  2 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  8 -----
 .../main/java/org/apache/doris/load/ExportJob.java | 20 ++++++-----
 .../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------
 .../trees/plans/commands/ExportCommand.java        |  2 +-
 .../doris/scheduler/disruptor/TaskHandler.java     |  2 ++
 .../scheduler/manager/TransientTaskManager.java    | 15 +++++++-
 .../scheduler/registry/ExportTaskRegister.java     | 40 ----------------------
 .../doris/analysis/CancelExportStmtTest.java       |  1 -
 9 files changed, 48 insertions(+), 72 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index a9ce85b2d3e..ba7aa50ec69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -208,7 +208,7 @@ public class ExportStmt extends StatementBase implements 
NotFallbackInParser {
     }
 
     private void setJob() throws UserException {
-        exportJob = new ExportJob();
+        exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
 
         Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
         exportJob.setDbId(db.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 47caa710f5e..dcc32d8276f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -254,7 +254,6 @@ import 
org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
 import 
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
-import org.apache.doris.scheduler.registry.ExportTaskRegister;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.AnalysisManager;
@@ -395,7 +394,6 @@ public class Env {
     private ExternalMetaIdMgr externalMetaIdMgr;
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
-    private ExportTaskRegister exportTaskRegister;
     private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
     private LabelProcessor labelProcessor;
     private TransientTaskManager transientTaskManager;
@@ -709,7 +707,6 @@ public class Env {
         this.jobManager = new JobManager<>();
         this.labelProcessor = new LabelProcessor();
         this.transientTaskManager = new TransientTaskManager();
-        this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
 
         this.replayedJournalId = new AtomicLong(0L);
         this.stmtIdCounter = new AtomicLong(0L);
@@ -4418,11 +4415,6 @@ public class Env {
         return this.syncJobManager;
     }
 
-
-    public ExportTaskRegister getExportTaskRegister() {
-        return exportTaskRegister;
-    }
-
     public JobManager getJobManager() {
         return jobManager;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 33418531f2c..e77b0517d95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -98,7 +98,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 @Data
@@ -207,9 +206,7 @@ public class ExportJob implements Writable {
     // backend_address => snapshot path
     private List<Pair<TNetworkAddress, String>> snapshotPaths = 
Lists.newArrayList();
 
-    private List<ExportTaskExecutor> jobExecutorList;
-
-    private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor = 
new ConcurrentHashMap<>();
+    private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();
 
     private Integer finishedTaskCount = 0;
     private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
@@ -399,8 +396,8 @@ public class ExportJob implements Writable {
         return statementBase;
     }
 
-    public List<? extends TransientTaskExecutor> getTaskExecutors() {
-        return jobExecutorList;
+    public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() {
+        return Lists.newArrayList(jobExecutorList);
     }
 
     private void generateExportJobExecutor() {
@@ -690,11 +687,11 @@ public class ExportJob implements Writable {
         }
 
         // we need cancel all task
-        taskIdToExecutor.keySet().forEach(id -> {
+        jobExecutorList.forEach(executor -> {
             try {
-                
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
+                
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
             } catch (JobException e) {
-                LOG.warn("cancel export task {} exception: {}", id, e);
+                LOG.warn("cancel export task {} exception: {}", 
executor.getId(), e);
             }
         });
 
@@ -705,10 +702,12 @@ public class ExportJob implements Writable {
         setExportJobState(ExportJobState.CANCELLED);
         finishTimeMs = System.currentTimeMillis();
         failMsg = new ExportFailMsg(type, msg);
+        jobExecutorList.clear();
         if (FeConstants.runningUnitTest) {
             return;
         }
         Env.getCurrentEnv().getEditLog().logExportUpdateState(id, 
ExportJobState.CANCELLED);
+        LOG.info("cancel export job {}", id);
     }
 
     private void exportExportJob() {
@@ -749,7 +748,10 @@ public class ExportJob implements Writable {
         setExportJobState(ExportJobState.FINISHED);
         finishTimeMs = System.currentTimeMillis();
         outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
+        // Clear the jobExecutorList to release memory.
+        jobExecutorList.clear();
         Env.getCurrentEnv().getEditLog().logExportUpdateState(id, 
ExportJobState.FINISHED);
+        LOG.info("finish export job {}", id);
     }
 
     public void replayExportJobState(ExportJobState newState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 7dbe953cf9b..49ebbfe7dcd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -67,8 +67,8 @@ public class ExportMgr {
     // dbid -> <label -> job>
     private Map<Long, Map<String, Long>> dbTolabelToExportJobId = 
Maps.newHashMap();
 
-    // lock for export job
-    // lock is private and must use after db lock
+    // lock for protecting export jobs.
+    // need to be added when creating or cancelling export job.
     private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
 
     public ExportMgr() {
@@ -95,8 +95,6 @@ public class ExportMgr {
     }
 
     public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
-        long jobId = Env.getCurrentEnv().getNextId();
-        job.setId(jobId);
         writeLock();
         try {
             if (dbTolabelToExportJobId.containsKey(job.getDbId())
@@ -117,15 +115,17 @@ public class ExportMgr {
                 BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
                         job.getBrokerDesc());
             }
-            job.getTaskExecutors().forEach(executor -> {
-                Long taskId = 
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
-                job.getTaskIdToExecutor().put(taskId, executor);
-            });
             Env.getCurrentEnv().getEditLog().logExportCreate(job);
+            // ATTN: Must add task after edit log, otherwise the job may 
finish before adding job.
+            job.getCopiedTaskExecutors().forEach(executor -> {
+                
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
+            });
+            LOG.info("add export job. {}", job);
+
         } finally {
             writeUnlock();
         }
-        LOG.info("add export job. {}", job);
+
     }
 
     public void cancelExportJob(CancelExportStmt stmt) throws DdlException, 
AnalysisException {
@@ -142,6 +142,11 @@ public class ExportMgr {
 
         // check auth
         checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, 
stmt.getDbName(), matchExportJobs);
+        // Must add lock to protect export job.
+        // Because job may be cancelled when generating task executors,
+        // the cancel process may clear the task executor list at same time,
+        // which will cause ConcurrentModificationException
+        writeLock();
         try {
             for (ExportJob exportJob : matchExportJobs) {
                 // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, 
"user cancel");
@@ -150,6 +155,8 @@ public class ExportMgr {
             }
         } catch (JobException e) {
             throw new AnalysisException(e.getMessage());
+        } finally {
+            writeUnlock();
         }
     }
 
@@ -464,8 +471,9 @@ public class ExportMgr {
     }
 
     public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
-        readLock();
+        writeLock();
         try {
+            LOG.info("replay update export job: {}, {}", 
stateTransfer.getJobId(), stateTransfer.getState());
             ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
             job.replayExportJobState(stateTransfer.getState());
             job.setStartTimeMs(stateTransfer.getStartTimeMs());
@@ -473,7 +481,7 @@ public class ExportMgr {
             job.setFailMsg(stateTransfer.getFailMsg());
             job.setOutfileInfo(stateTransfer.getOutFileInfo());
         } finally {
-            readUnlock();
+            writeUnlock();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index dbf6cf7067e..38083e406b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -242,7 +242,7 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
 
     private ExportJob generateExportJob(ConnectContext ctx, Map<String, 
String> fileProperties, TableName tblName)
             throws UserException {
-        ExportJob exportJob = new ExportJob();
+        ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
         // set export job and check catalog/db/table
         CatalogIf catalog = 
ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
         DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index de889c1b2e4..193f8ece9f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -68,6 +68,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
             taskExecutor.execute();
         } catch (JobException e) {
             log.warn("Memory task execute failed, taskId: {}, msg : {}", 
taskId, e.getMessage());
+        } finally {
+            transientTaskManager.removeMemoryTask(taskId);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 51edd4af318..7461399c8eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -22,10 +22,13 @@ import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
 
 import lombok.Setter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.concurrent.ConcurrentHashMap;
 
 public class TransientTaskManager {
+    private static final Logger LOG = 
LogManager.getLogger(TransientTaskManager.class);
     /**
      * key: taskId
      * value: memory task executor of this task
@@ -57,10 +60,20 @@ public class TransientTaskManager {
         Long taskId = executor.getId();
         taskExecutorMap.put(taskId, executor);
         disruptor.tryPublishTask(taskId);
+        LOG.info("add memory task, taskId: {}", taskId);
         return taskId;
     }
 
     public void cancelMemoryTask(Long taskId) throws JobException {
-        taskExecutorMap.get(taskId).cancel();
+        try {
+            taskExecutorMap.get(taskId).cancel();
+        } finally {
+            removeMemoryTask(taskId);
+        }
+    }
+
+    public void removeMemoryTask(Long taskId) {
+        taskExecutorMap.remove(taskId);
+        LOG.info("remove memory task, taskId: {}", taskId);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
deleted file mode 100644
index 0241f57fea0..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.scheduler.registry;
-
-import org.apache.doris.scheduler.exception.JobException;
-import org.apache.doris.scheduler.executor.TransientTaskExecutor;
-import org.apache.doris.scheduler.manager.TransientTaskManager;
-
-public class ExportTaskRegister implements TransientTaskRegister {
-    private final TransientTaskManager transientTaskManager;
-
-    public ExportTaskRegister(TransientTaskManager transientTaskManager) {
-        this.transientTaskManager = transientTaskManager;
-    }
-
-    @Override
-    public Long registerTask(TransientTaskExecutor executor) {
-        return transientTaskManager.addMemoryTask(executor);
-    }
-
-    @Override
-    public void cancelTask(Long taskId) throws JobException {
-        transientTaskManager.cancelMemoryTask(taskId);
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
index 2d188230d8b..4ff15653fa0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -234,7 +234,6 @@ public class CancelExportStmtTest extends TestWithFeService 
{
         exportMgr.unprotectAddJob(job3);
         exportMgr.unprotectAddJob(job4);
 
-
         // cancel export job where state = "PENDING"
         Assert.assertTrue(job1.getState() == ExportJobState.PENDING);
         SlotRef stateSlotRef = new SlotRef(null, "state");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to