This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8af666b1925 [branch-2.1](routine-load) enhance auto resume to keep
routine load stable (#32689)
8af666b1925 is described below
commit 8af666b192560a6161f93898c883b0af65a3bd55
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Mar 22 18:07:12 2024 +0800
[branch-2.1](routine-load) enhance auto resume to keep routine load stable
(#32689)
* enhance auto resume to keep routine load stable
* do not auto resume if job cannot resume definitely (#32419)
---
.../src/main/java/org/apache/doris/common/InternalErrorCode.java | 3 ++-
.../main/java/org/apache/doris/load/routineload/RoutineLoadJob.java | 6 +++---
.../main/java/org/apache/doris/load/routineload/ScheduleRule.java | 5 ++++-
.../src/main/java/org/apache/doris/service/FrontendServiceImpl.java | 2 +-
4 files changed, 10 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index 2bbd5c58efa..b871fd198cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -34,7 +34,8 @@ public enum InternalErrorCode {
MANUAL_STOP_ERR(101),
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
- TASKS_ABORT_ERR(104);
+ TASKS_ABORT_ERR(104),
+ CANNOT_RESUME_ERR(105);
private long errCode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0f8341150fe..bb271a08d6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1184,7 +1184,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
+ " with reason: " +
txnStatusChangeReasonString
+ " please check the jsonpaths";
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
case OFFSET_OUT_OF_RANGE:
@@ -1197,14 +1197,14 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
+ " using the Alter ROUTINE LOAD
command to modify it,"
+ " and resume the job";
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
case PAUSE:
msg = "be " + taskBeId + " abort task "
+ "with reason: " +
txnStatusChangeReasonString;
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
default:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index e2eaf4d825d..2b4ef7b297a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -63,7 +63,10 @@ public class ScheduleRule {
jobRoutine.id, jobRoutine.firstResumeTimestamp,
jobRoutine.autoResumeCount,
jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
}
- if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode()
== InternalErrorCode.REPLICA_FEW_ERR) {
+ if (jobRoutine.pauseReason != null
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.CANNOT_RESUME_ERR) {
int dead = deadBeCount();
if (dead > Config.max_tolerable_backend_down_num) {
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a332f6f5031..8931d7b0859 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1979,7 +1979,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
try {
RoutineLoadJob routineLoadJob =
Env.getCurrentEnv().getRoutineLoadManager()
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
- routineLoadJob.updateState(JobState.PAUSED, new
ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
+ routineLoadJob.updateState(JobState.PAUSED, new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR,
"failed to get stream load plan, " +
exception.getMessage()), false);
} catch (UserException e) {
LOG.warn("catch update routine load job error.", e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]