This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f31e32c5df5 [fix](routine-load) add lock to avoid editlog out of order when concurrent update job (#31095) f31e32c5df5 is described below commit f31e32c5df520ee9550bf8e29a04755a24fa84ec Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Mon Feb 19 21:50:26 2024 +0800 [fix](routine-load) add lock to avoid editlog out of order when concurrent update job (#31095) --- .../doris/load/routineload/RoutineLoadManager.java | 56 ++++++++++++++++++---- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index dde2e31bb9a..ef96b9a42ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -307,12 +307,23 @@ public class RoutineLoadManager implements Writable { public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws UserException { List<RoutineLoadJob> jobs = Lists.newArrayList(); - if (pauseRoutineLoadStmt.isAll()) { - jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName()); - } else { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), - pauseRoutineLoadStmt.getName()); - jobs.add(routineLoadJob); + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + if (pauseRoutineLoadStmt.isAll()) { + jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName()); + } else { + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), + pauseRoutineLoadStmt.getName()); + jobs.add(routineLoadJob); + } + } finally { + readUnlock(); } for (RoutineLoadJob routineLoadJob : jobs) { @@ -373,8 +384,20 @@ public class RoutineLoadManager implements Writable { public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), - stopRoutineLoadStmt.getName()); + RoutineLoadJob routineLoadJob; + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(), + stopRoutineLoadStmt.getName()); + } finally { + readUnlock(); + } routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"), @@ -796,6 +819,9 @@ public class RoutineLoadManager implements Writable { job.updateState(operation.getJobState(), null, true /* is replay */); } catch (UserException e) { LOG.error("should not happened", e); + } catch (NullPointerException npe) { + LOG.error("cannot get job when replaying state change job, which is unexpected, job id: " + + operation.getId()); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) .add("current_state", operation.getJobState()) @@ -807,7 +833,19 @@ public class RoutineLoadManager implements Writable { * Enter of altering a routine load job */ public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException { - RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); + RoutineLoadJob job; + // it needs lock when getting routine load job, + // otherwise, it may cause the editLog out of order in the following scenarios: + // thread A: create job and record job meta + // thread B: change job state and persist in editlog according to meta + // thread A: persist in editlog + // which will cause the null pointer exception when replaying editLog + readLock(); + try { + job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); + } finally { + readUnlock(); + } if (stmt.hasDataSourceProperty() && !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) { throw new DdlException("The specified job type is not: " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org