This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e14538fcf55 branch-3.1: [fix](mtmv)fix mtmv may have repeat tasks
after canceled #48830 (#51972)
e14538fcf55 is described below
commit e14538fcf55e72e28ebb4ed035b49eb05ddc4177
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 14:55:08 2025 +0800
branch-3.1: [fix](mtmv)fix mtmv may have repeat tasks after canceled #48830
(#51972)
Cherry-picked from #48830
Co-authored-by: zhangdong <[email protected]>
---
.../doris/job/extensions/insert/InsertTask.java | 10 +++---
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 41 ++++++++++++++++++----
.../org/apache/doris/job/task/AbstractTask.java | 31 +++++++++++-----
.../main/java/org/apache/doris/job/task/Task.java | 7 ++--
.../suites/mtmv_p0/test_task_mtmv.groovy | 3 ++
5 files changed, 70 insertions(+), 22 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index ff18f611c1e..eececebe419 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -212,18 +212,18 @@ public class InsertTask extends AbstractTask {
}
@Override
- public void onFail() throws JobException {
+ public boolean onFail() throws JobException {
if (isCanceled.get()) {
- return;
+ return false;
}
isFinished.set(true);
- super.onFail();
+ return super.onFail();
}
@Override
- public void onSuccess() throws JobException {
+ public boolean onSuccess() throws JobException {
isFinished.set(true);
- super.onSuccess();
+ return super.onSuccess();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 198ba218918..bce0e0ba73a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -362,24 +362,53 @@ public class MTMVTask extends AbstractTask {
}
@Override
- public synchronized void onFail() throws JobException {
+ public synchronized boolean onFail() throws JobException {
LOG.info("mtmv task onFail, taskId: {}", super.getTaskId());
- super.onFail();
+ boolean res = super.onFail();
+ if (!res) {
+ return false;
+ }
after();
+ return true;
}
@Override
- public synchronized void onSuccess() throws JobException {
+ public synchronized boolean onSuccess() throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug("mtmv task onSuccess, taskId: {}", super.getTaskId());
}
- super.onSuccess();
+ boolean res = super.onSuccess();
+ if (!res) {
+ return false;
+ }
after();
+ return true;
}
+ /**
+ * The reason for overriding the parent class is to add synchronized
protection
+ */
@Override
- protected synchronized void executeCancelLogic(boolean
needWaitCancelComplete) {
- LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
+ public synchronized boolean cancel(boolean needWaitCancelComplete) throws
JobException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("mtmv task cancel, taskId: {}", super.getTaskId());
+ }
+ return super.cancel(needWaitCancelComplete);
+ }
+
+ @Override
+ protected void executeCancelLogic(boolean needWaitCancelComplete) {
+ try {
+ // Mtmv is initialized in the before method.
+ // If the task has not yet run, the before method will not be
used, so mtmv will be empty,
+ // which prevents the canceled task from being added to the
history list
+ if (mtmv == null) {
+ mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
+ }
+ } catch (UserException e) {
+ LOG.warn("executeCancelLogic failed:", e);
+ return;
+ }
if (executor != null) {
executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task
cancelled"), needWaitCancelComplete);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index b356bc58d32..a40696a8664 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -63,12 +63,16 @@ public abstract class AbstractTask implements Task {
}
@Override
- public void onFail() throws JobException {
+ public boolean onFail() throws JobException {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return false;
+ }
status = TaskStatus.FAILED;
if (!isCallable()) {
- return;
+ return false;
}
Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
+ return true;
}
@Override
@@ -109,21 +113,22 @@ public abstract class AbstractTask implements Task {
protected abstract void closeOrReleaseResources();
@Override
- public void onSuccess() throws JobException {
+ public boolean onSuccess() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
- return;
+ return false;
}
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
- return;
+ return false;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (null == job) {
log.info("job is null, job id is {}", jobId);
- return;
+ return false;
}
job.onTaskSuccess(this);
+ return true;
}
/**
@@ -135,10 +140,15 @@ public abstract class AbstractTask implements Task {
* the original exception.
*/
@Override
- public void cancel(boolean needWaitCancelComplete) throws JobException {
+ public boolean cancel(boolean needWaitCancelComplete) throws JobException {
+ if (TaskStatus.SUCCESS.equals(status) ||
TaskStatus.FAILED.equals(status) || TaskStatus.CANCELED.equals(
+ status)) {
+ return false;
+ }
try {
status = TaskStatus.CANCELED;
executeCancelLogic(needWaitCancelComplete);
+ return true;
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId,
taskId, e);
throw new JobException(e);
@@ -174,7 +184,12 @@ public abstract class AbstractTask implements Task {
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
} finally {
- closeOrReleaseResources();
+ // The cancel logic will call the closeOrReleased Resources method
by itself.
+ // If it is also called here,
+ // it may result in the inability to obtain relevant information
when canceling the task
+ if (!TaskStatus.CANCELED.equals(status)) {
+ closeOrReleaseResources();
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index d184f647075..487c62b8934 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -44,7 +44,7 @@ public interface Task {
* This method is called when the task fails to execute successfully.
* Implementations can use this method to handle any failure scenarios.
*/
- void onFail() throws JobException;
+ boolean onFail() throws JobException;
/**
* This method is called when the task fails to execute successfully, with
an additional error message.
@@ -58,15 +58,16 @@ public interface Task {
* This method is called when the task executes successfully.
* Implementations can use this method to handle successful execution
scenarios.
*/
- void onSuccess() throws JobException;
+ boolean onSuccess() throws JobException;
/**
* This method is called to cancel the execution of the task.
* Implementations should define the necessary steps to cancel the task.
*
* @param needWaitCancelComplete Do we need to wait for the cancellation
to be completed.
+ * @return if cancel success
*/
- void cancel(boolean needWaitCancelComplete) throws JobException;
+ boolean cancel(boolean needWaitCancelComplete) throws JobException;
/**
* get info for tvf `tasks`
diff --git a/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
index 72356b45a79..41859ddeefe 100644
--- a/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
@@ -57,6 +57,9 @@ suite("test_task_mtmv") {
log.info("cancel error msg: " + e.getMessage())
assertTrue(e.getMessage().contains("no running task"));
}
+ def tasksAfterCancel = sql """ select TaskId from tasks('type'='mv') where
MvName = '${mvName}';"""
+ // should only has one task after cancel
+ assertEquals(1, tasksAfterCancel.size());
sql """drop materialized view if exists ${mvName};"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]