This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 5e443c7619e [fix](routine-load) add lock to avoid editlog out of order when concurrent update job (#31095) (#31168) 5e443c7619e is described below commit 5e443c7619ea664210b7ca962adf4513ce650223 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Tue Feb 20 23:40:40 2024 +0800 [fix](routine-load) add lock to avoid editlog out of order when concurrent update job (#31095) (#31168) --- .../doris/load/routineload/RoutineLoadManager.java | 56 ++++++++++++++++++---- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index dbf3d6a09a5..4c66c510126 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -303,12 +303,23 @@ public class RoutineLoadManager implements Writable { public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws UserException { List<RoutineLoadJob> jobs = Lists.newArrayList(); - if (pauseRoutineLoadStmt.isAll()) { - jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName()); - } else { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), - pauseRoutineLoadStmt.getName()); - jobs.add(routineLoadJob); + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + if (pauseRoutineLoadStmt.isAll()) { + jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName()); + } else { + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), + pauseRoutineLoadStmt.getName()); + jobs.add(routineLoadJob); + } + } finally { + readUnlock(); } for (RoutineLoadJob routineLoadJob : jobs) { @@ -369,8 +380,20 @@ public class RoutineLoadManager implements Writable { public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), - stopRoutineLoadStmt.getName()); + RoutineLoadJob routineLoadJob; + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), + stopRoutineLoadStmt.getName()); + } finally { + readUnlock(); + } routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), @@ -792,6 +815,9 @@ public class RoutineLoadManager implements Writable { job.updateState(operation.getJobState(), null, true /* is replay */); } catch (UserException e) { LOG.error("should not happened", e); + } catch (NullPointerException npe) { + LOG.error("cannot get job when replaying state change job, which is unexpected, job id: " + + operation.getId()); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) .add("current_state", operation.getJobState()) @@ -803,7 +829,19 @@ public class RoutineLoadManager implements Writable { * Enter of altering a routine load job */ public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException { - RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); + RoutineLoadJob job; + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); + } finally { + readUnlock(); + } if (stmt.hasDataSourceProperty() && !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) { throw new DdlException("The specified job type is not: " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org