zddr commented on code in PR #50942:
URL: https://github.com/apache/doris/pull/50942#discussion_r2123474988


##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -271,32 +294,50 @@ public void triggerJob(long jobId, C context) throws 
JobException {
     }
 
     public void replayCreateJob(T job) throws JobException {
-        if (jobMap.containsKey(job.getJobId())) {
+        // mtmv has its own editLog to replay jobs, here it is to ignore the 
logs already generated by older versions.
+        if (!job.needPersist()) {
             return;
         }
-        jobMap.putIfAbsent(job.getJobId(), job);
-        job.onReplayCreate();
+        createJobInternal(job, true);
     }
 
     /**
      * Replay update load job.
      **/
     public void replayUpdateJob(T job) {
-        jobMap.put(job.getJobId(), job);
-        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
+        Long jobId = job.getJobId();
+        // In previous versions, the job ID in MTMV was not fixed (a new ID 
would be generated each time the editLog
+        // was replayed), but the name was constant and unique. However, since 
job updates use jobId as the key,
+        // it is possible that this jobId no longer exists. Therefore, we now 
look up the ID based on the name.
+        if (!jobMap.containsKey(jobId) && job instanceof MTMVJob) {
+            List<T> jobs = queryJobs(JobType.MV, job.getJobName());
+            if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
+                LOG.warn("jobs by name: {} not normal,should have one job,but 
job num is: {}", job.getJobName(),
+                        jobs.size());
+                return;
+            }
+            jobId = jobs.get(0).getJobId();
+            job.setJobId(jobId);
+        }
+
+        if (!jobMap.containsKey(jobId)) {
+            LOG.warn("replayUpdateJob not normal, job: {}, jobId: {}, jobMap: 
{}", job, jobId, jobMap);
+            return;
+        }
+        jobMap.put(jobId, job);
+        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
                 .add("msg", "replay update scheduler job").build());
     }

Review Comment:
   The method org.apache.doris.job.base.AbstractJob#needPersist already exists, 
but the current logic is unrelated to it. The main purpose here is to maintain 
compatibility with the changes in the ID generation approach for MTMVJob. If 
this compatibility is not ensured, the IDs generated when creating a 
materialized view and when updating the job will differ, preventing successful 
updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to