github-actions[bot] commented on code in PR #63654:
URL: https://github.com/apache/doris/pull/63654#discussion_r3332785849


##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java:
##########
@@ -955,6 +971,30 @@ public boolean hasMoreDataToConsume(UUID taskId, 
Map<Integer, Long> partitionIdT
         return false;
     }
 
+    @Override
+    public void updateLag() throws UserException {
+        List<Integer> partitionIds;
+        String brokerListSnapshot;
+        String topicSnapshot;
+        Map<String, String> customPropertiesSnapshot;
+        readLock();
+        try {
+            partitionIds = Lists.newArrayList(((KafkaProgress) 
progress).getOffsetByPartition().keySet());
+            if (partitionIds.isEmpty()) {
+                return;
+            }
+            brokerListSnapshot = brokerList;
+            topicSnapshot = topic;
+            customPropertiesSnapshot = 
Maps.newHashMap(convertedCustomProperties);
+        } finally {

Review Comment:
   `convertedCustomProperties` is derived state and is not persisted, but 
`updateLag()` snapshots it without rebuilding it. After FE restart/replay, a 
Kafka job with custom partitions does not go through `getAllKafkaPartitions()` 
in `update()` (the custom-partition branch returns early), so this map can 
still be empty here while `customProperties` contains required 
SASL/FILE/default-offset settings. The lag refresh then calls Kafka with 
incomplete properties and will repeatedly fail or report stale lag even though 
task execution later rebuilds the properties in `hasMoreDataToConsume()`. 
Please rebuild/snapshot the converted properties under the job lock before 
calling `getLatestOffsets()`, similar to the task path, and add coverage for 
replayed/custom-partition jobs with custom Kafka properties.



##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java:
##########
@@ -62,6 +62,8 @@ protected void runAfterCatalogReady() {
     private void process() throws UserException {
         // update
         routineLoadManager.updateRoutineLoadJob();
+        // refresh lag cache after job progress and partition metadata are 
updated
+        routineLoadManager.updateRoutineLoadJobLag();
         // get need schedule routine jobs

Review Comment:
   This adds synchronous Kafka metadata I/O to the main RoutineLoadScheduler 
round before `getNeedScheduleRoutineJobs()` and timeout processing. 
`updateLag()` calls `KafkaUtil.getLatestOffsets()` once per active Kafka 
routine-load job, so one slow or unreachable Kafka cluster can block the single 
scheduler daemon until the Kafka client times out, delaying scheduling and 
timeout renewal for unrelated routine-load jobs. Please move this refresh out 
of the scheduling critical path (for example a separate daemon/thread pool with 
bounded concurrency), or otherwise ensure lag refresh failures/timeouts cannot 
stall normal routine-load scheduling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to