This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e9290340c89 [fix](export) remove export task executor in TransientTaskExecutor and fix concurrency issue (#42880)(#43051)(#43109)(#43250) (#43305) e9290340c89 is described below commit e9290340c899ad94c4f8c6a8964de18464321f5c Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Wed Nov 6 13:55:27 2024 +0800 [fix](export) remove export task executor in TransientTaskExecutor and fix concurrency issue (#42880)(#43051)(#43109)(#43250) (#43305) cherry pick from (#42880)(#43051)(#43109)(#43250) --- .../java/org/apache/doris/analysis/ExportStmt.java | 2 +- .../main/java/org/apache/doris/catalog/Env.java | 8 ----- .../main/java/org/apache/doris/load/ExportJob.java | 20 ++++++----- .../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------ .../trees/plans/commands/ExportCommand.java | 2 +- .../doris/scheduler/disruptor/TaskHandler.java | 2 ++ .../scheduler/manager/TransientTaskManager.java | 15 +++++++- .../scheduler/registry/ExportTaskRegister.java | 40 ---------------------- .../doris/analysis/CancelExportStmtTest.java | 1 - 9 files changed, 48 insertions(+), 72 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index a9ce85b2d3e..ba7aa50ec69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -208,7 +208,7 @@ public class ExportStmt extends StatementBase implements NotFallbackInParser { } private void setJob() throws UserException { - exportJob = new ExportJob(); + exportJob = new ExportJob(Env.getCurrentEnv().getNextId()); Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb()); exportJob.setDbId(db.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 47caa710f5e..dcc32d8276f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -254,7 +254,6 @@ import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher; import org.apache.doris.scheduler.manager.TransientTaskManager; -import org.apache.doris.scheduler.registry.ExportTaskRegister; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; @@ -395,7 +394,6 @@ public class Env { private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; - private ExportTaskRegister exportTaskRegister; private JobManager<? extends AbstractJob<?, ?>, ?> jobManager; private LabelProcessor labelProcessor; private TransientTaskManager transientTaskManager; @@ -709,7 +707,6 @@ public class Env { this.jobManager = new JobManager<>(); this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); - this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); @@ -4418,11 +4415,6 @@ public class Env { return this.syncJobManager; } - - public ExportTaskRegister getExportTaskRegister() { - return exportTaskRegister; - } - public JobManager getJobManager() { return jobManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 33418531f2c..e77b0517d95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -98,7 +98,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Data @@ -207,9 +206,7 @@ public class ExportJob implements Writable { // backend_address => snapshot path private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList(); - private List<ExportTaskExecutor> jobExecutorList; - - private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>(); + private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList(); private Integer finishedTaskCount = 0; private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList(); @@ -399,8 +396,8 @@ public class ExportJob implements Writable { return statementBase; } - public List<? extends TransientTaskExecutor> getTaskExecutors() { - return jobExecutorList; + public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() { + return Lists.newArrayList(jobExecutorList); } private void generateExportJobExecutor() { @@ -690,11 +687,11 @@ public class ExportJob implements Writable { } // we need cancel all task - taskIdToExecutor.keySet().forEach(id -> { + jobExecutorList.forEach(executor -> { try { - Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id); + Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId()); } catch (JobException e) { - LOG.warn("cancel export task {} exception: {}", id, e); + LOG.warn("cancel export task {} exception: {}", executor.getId(), e); } }); @@ -705,10 +702,12 @@ public class ExportJob implements Writable { setExportJobState(ExportJobState.CANCELLED); finishTimeMs = System.currentTimeMillis(); failMsg = new ExportFailMsg(type, msg); + jobExecutorList.clear(); if (FeConstants.runningUnitTest) { return; } Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED); + LOG.info("cancel export job {}", id); } private void exportExportJob() { @@ -749,7 +748,10 @@ public class ExportJob implements Writable { setExportJobState(ExportJobState.FINISHED); finishTimeMs = System.currentTimeMillis(); outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo); + // Clear the jobExecutorList to release memory. + jobExecutorList.clear(); Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED); + LOG.info("finish export job {}", id); } public void replayExportJobState(ExportJobState newState) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 7dbe953cf9b..49ebbfe7dcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -67,8 +67,8 @@ public class ExportMgr { // dbid -> <label -> job> private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap(); - // lock for export job - // lock is private and must use after db lock + // lock for protecting export jobs. + // need to be added when creating or cancelling export job. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); public ExportMgr() { @@ -95,8 +95,6 @@ public class ExportMgr { } public void addExportJobAndRegisterTask(ExportJob job) throws Exception { - long jobId = Env.getCurrentEnv().getNextId(); - job.setId(jobId); writeLock(); try { if (dbTolabelToExportJobId.containsKey(job.getDbId()) @@ -117,15 +115,17 @@ public class ExportMgr { BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), job.getBrokerDesc()); } - job.getTaskExecutors().forEach(executor -> { - Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); - job.getTaskIdToExecutor().put(taskId, executor); - }); Env.getCurrentEnv().getEditLog().logExportCreate(job); + // ATTN: Must add task after edit log, otherwise the job may finish before adding job. + job.getCopiedTaskExecutors().forEach(executor -> { + Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); + }); + LOG.info("add export job. {}", job); + } finally { writeUnlock(); } - LOG.info("add export job. {}", job); + } public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { @@ -142,6 +142,11 @@ public class ExportMgr { // check auth checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, stmt.getDbName(), matchExportJobs); + // Must add lock to protect export job. + // Because job may be cancelled when generating task executors, + // the cancel process may clear the task executor list at same time, + // which will cause ConcurrentModificationException + writeLock(); try { for (ExportJob exportJob : matchExportJobs) { // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); @@ -150,6 +155,8 @@ public class ExportMgr { } } catch (JobException e) { throw new AnalysisException(e.getMessage()); + } finally { + writeUnlock(); } } @@ -464,8 +471,9 @@ public class ExportMgr { } public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) { - readLock(); + writeLock(); try { + LOG.info("replay update export job: {}, {}", stateTransfer.getJobId(), stateTransfer.getState()); ExportJob job = exportIdToJob.get(stateTransfer.getJobId()); job.replayExportJobState(stateTransfer.getState()); job.setStartTimeMs(stateTransfer.getStartTimeMs()); @@ -473,7 +481,7 @@ public class ExportMgr { job.setFailMsg(stateTransfer.getFailMsg()); job.setOutfileInfo(stateTransfer.getOutFileInfo()); } finally { - readUnlock(); + writeUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index dbf6cf7067e..38083e406b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -242,7 +242,7 @@ public class ExportCommand extends Command implements ForwardWithSync { private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName) throws UserException { - ExportJob exportJob = new ExportJob(); + ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId()); // set export job and check catalog/db/table CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl()); DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index de889c1b2e4..193f8ece9f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -68,6 +68,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> { taskExecutor.execute(); } catch (JobException e) { log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage()); + } finally { + transientTaskManager.removeMemoryTask(taskId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index 51edd4af318..7461399c8eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -22,10 +22,13 @@ import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; import lombok.Setter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; public class TransientTaskManager { + private static final Logger LOG = LogManager.getLogger(TransientTaskManager.class); /** * key: taskId * value: memory task executor of this task @@ -57,10 +60,20 @@ public class TransientTaskManager { Long taskId = executor.getId(); taskExecutorMap.put(taskId, executor); disruptor.tryPublishTask(taskId); + LOG.info("add memory task, taskId: {}", taskId); return taskId; } public void cancelMemoryTask(Long taskId) throws JobException { - taskExecutorMap.get(taskId).cancel(); + try { + taskExecutorMap.get(taskId).cancel(); + } finally { + removeMemoryTask(taskId); + } + } + + public void removeMemoryTask(Long taskId) { + taskExecutorMap.remove(taskId); + LOG.info("remove memory task, taskId: {}", taskId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java deleted file mode 100644 index 0241f57fea0..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.scheduler.registry; - -import org.apache.doris.scheduler.exception.JobException; -import org.apache.doris.scheduler.executor.TransientTaskExecutor; -import org.apache.doris.scheduler.manager.TransientTaskManager; - -public class ExportTaskRegister implements TransientTaskRegister { - private final TransientTaskManager transientTaskManager; - - public ExportTaskRegister(TransientTaskManager transientTaskManager) { - this.transientTaskManager = transientTaskManager; - } - - @Override - public Long registerTask(TransientTaskExecutor executor) { - return transientTaskManager.addMemoryTask(executor); - } - - @Override - public void cancelTask(Long taskId) throws JobException { - transientTaskManager.cancelMemoryTask(taskId); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index 2d188230d8b..4ff15653fa0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -234,7 +234,6 @@ public class CancelExportStmtTest extends TestWithFeService { exportMgr.unprotectAddJob(job3); exportMgr.unprotectAddJob(job4); - // cancel export job where state = "PENDING" Assert.assertTrue(job1.getState() == ExportJobState.PENDING); SlotRef stateSlotRef = new SlotRef(null, "state"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org