sollhui commented on code in PR #38474:
URL: https://github.com/apache/doris/pull/38474#discussion_r1703509867


##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java:
##########
@@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String> 
jobProperties,
                 customKafkaProperties = 
dataSourceProperties.getCustomKafkaProperties();
             }
 
-            // modify partition offset first
-            if (!kafkaPartitionOffsets.isEmpty()) {
-                // we can only modify the partition that is being consumed
-                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
-            }
-
+            // convertCustomProperties and check partitions before reset 
progress to make modify operation atomic
             if (!customKafkaProperties.isEmpty()) {
                 this.customProperties.putAll(customKafkaProperties);
                 convertCustomProperties(true);
             }
-            // modify broker list and topic
-            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
-                this.brokerList = dataSourceProperties.getBrokerList();
+
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                ((KafkaProgress) 
progress).checkPartitions(kafkaPartitionOffsets);
             }
+
+            // It is necessary to reset the Kafka progress cache if topic 
change,
+            // and should reset cache before modifying partition offset.
             if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
+                if (Config.isCloudMode()) {
+                    resetCloudProgress();
+                }
                 this.topic = dataSourceProperties.getTopic();
+                this.progress = new KafkaProgress();
+            }
+
+            // modify partition offset
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                // we can only modify the partition that is being consumed
+                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);

Review Comment:
   We need, and will fix it in next pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to