This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f31e32c5df5 [fix](routine-load) add lock to avoid editlog out of order 
when concurrent update job (#31095)
f31e32c5df5 is described below

commit f31e32c5df520ee9550bf8e29a04755a24fa84ec
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Mon Feb 19 21:50:26 2024 +0800

    [fix](routine-load) add lock to avoid editlog out of order when concurrent 
update job (#31095)
---
 .../doris/load/routineload/RoutineLoadManager.java | 56 ++++++++++++++++++----
 1 file changed, 47 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index dde2e31bb9a..ef96b9a42ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -307,12 +307,23 @@ public class RoutineLoadManager implements Writable {
     public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
             throws UserException {
         List<RoutineLoadJob> jobs = Lists.newArrayList();
-        if (pauseRoutineLoadStmt.isAll()) {
-            jobs = 
checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
-        } else {
-            RoutineLoadJob routineLoadJob = 
checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
-                    pauseRoutineLoadStmt.getName());
-            jobs.add(routineLoadJob);
+        // it needs lock when getting routine load job,
+        // otherwise, it may cause the editLog out of order in the following 
scenarios:
+        // thread A: create job and record job meta
+        // thread B: change job state and persist in editlog according to meta
+        // thread A: persist in editlog
+        // which will cause the null pointer exception when replaying editLog
+        readLock();
+        try {
+            if (pauseRoutineLoadStmt.isAll()) {
+                jobs = 
checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
+            } else {
+                RoutineLoadJob routineLoadJob = 
checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
+                        pauseRoutineLoadStmt.getName());
+                jobs.add(routineLoadJob);
+            }
+        } finally {
+            readUnlock();
         }
 
         for (RoutineLoadJob routineLoadJob : jobs) {
@@ -373,8 +384,20 @@ public class RoutineLoadManager implements Writable {
 
     public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
             throws UserException {
-        RoutineLoadJob routineLoadJob = 
checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
-                stopRoutineLoadStmt.getName());
+        RoutineLoadJob routineLoadJob;
+        // it needs lock when getting routine load job,
+        // otherwise, it may cause the editLog out of order in the following 
scenarios:
+        // thread A: create job and record job meta
+        // thread B: change job state and persist in editlog according to meta
+        // thread A: persist in editlog
+        // which will cause the null pointer exception when replaying editLog
+        readLock();
+        try {
+            routineLoadJob = 
checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
+                    stopRoutineLoadStmt.getName());
+        } finally {
+            readUnlock();
+        }
         routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
                 new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
                         "User  " + ConnectContext.get().getQualifiedUser() + " 
stop routine load job"),
@@ -796,6 +819,9 @@ public class RoutineLoadManager implements Writable {
             job.updateState(operation.getJobState(), null, true /* is replay 
*/);
         } catch (UserException e) {
             LOG.error("should not happened", e);
+        } catch (NullPointerException npe) {
+            LOG.error("cannot get job when replaying state change job, which 
is unexpected, job id: "
+                    + operation.getId());
         }
         LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId())
                 .add("current_state", operation.getJobState())
@@ -807,7 +833,19 @@ public class RoutineLoadManager implements Writable {
      * Enter of altering a routine load job
      */
     public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws 
UserException {
-        RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), 
stmt.getLabel());
+        RoutineLoadJob job;
+        // it needs lock when getting routine load job,
+        // otherwise, it may cause the editLog out of order in the following 
scenarios:
+        // thread A: create job and record job meta
+        // thread B: change job state and persist in editlog according to meta
+        // thread A: persist in editlog
+        // which will cause the null pointer exception when replaying editLog
+        readLock();
+        try {
+            job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
+        } finally {
+            readUnlock();
+        }
         if (stmt.hasDataSourceProperty()
                 && 
!stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name()))
 {
             throw new DdlException("The specified job type is not: "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to