sollhui commented on code in PR #38474: URL: https://github.com/apache/doris/pull/38474#discussion_r1703509867
########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java: ########## @@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties, customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); } - // modify partition offset first - if (!kafkaPartitionOffsets.isEmpty()) { - // we can only modify the partition that is being consumed - ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); - } - + // convertCustomProperties and check partitions before reset progress to make modify operation atomic if (!customKafkaProperties.isEmpty()) { this.customProperties.putAll(customKafkaProperties); convertCustomProperties(true); } - // modify broker list and topic - if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { - this.brokerList = dataSourceProperties.getBrokerList(); + + if (!kafkaPartitionOffsets.isEmpty()) { + ((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets); } + + // It is necessary to reset the Kafka progress cache if topic change, + // and should reset cache before modifying partition offset. if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { + if (Config.isCloudMode()) { + resetCloudProgress(); + } this.topic = dataSourceProperties.getTopic(); + this.progress = new KafkaProgress(); + } + + // modify partition offset + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); Review Comment: We need, and will fix it in next pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org