CalvinKirs commented on code in PR #50942:
URL: https://github.com/apache/doris/pull/50942#discussion_r2115232956


##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -271,32 +294,50 @@ public void triggerJob(long jobId, C context) throws 
JobException {
     }
 
     public void replayCreateJob(T job) throws JobException {
-        if (jobMap.containsKey(job.getJobId())) {
+        // mtmv has its own editLog to replay jobs, here it is to ignore the 
logs already generated by older versions.
+        if (!job.needPersist()) {
             return;
         }
-        jobMap.putIfAbsent(job.getJobId(), job);
-        job.onReplayCreate();
+        createJobInternal(job, true);
     }
 
     /**
      * Replay update load job.
      **/
     public void replayUpdateJob(T job) {
-        jobMap.put(job.getJobId(), job);
-        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
+        Long jobId = job.getJobId();
+        // In previous versions, the job ID in MTMV was not fixed (a new ID 
would be generated each time the editLog
+        // was replayed), but the name was constant and unique. However, since 
job updates use jobId as the key,
+        // it is possible that this jobId no longer exists. Therefore, we now 
look up the ID based on the name.
+        if (!jobMap.containsKey(jobId) && job instanceof MTMVJob) {
+            List<T> jobs = queryJobs(JobType.MV, job.getJobName());
+            if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
+                LOG.warn("jobs by name: {} not normal,should have one job,but 
job num is: {}", job.getJobName(),
+                        jobs.size());
+                return;
+            }
+            jobId = jobs.get(0).getJobId();
+            job.setJobId(jobId);
+        }
+
+        if (!jobMap.containsKey(jobId)) {
+            LOG.warn("replayUpdateJob not normal, job: {}, jobId: {}, jobMap: 
{}", job, jobId, jobMap);
+            return;
+        }
+        jobMap.put(jobId, job);
+        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
                 .add("msg", "replay update scheduler job").build());
     }

Review Comment:
   Can persistence be used as a status flag that different task types set 
internally themselves, so that the task management layer can rely on this flag 
to make decisions—instead of using such an intrusive approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to