This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit f519f84b7df11ebc12bf475c70bd35a126f480c0 Author: geekr <geek.ru...@gmail.com> AuthorDate: Fri Oct 7 13:54:42 2022 -0400 [camel-18327] resuming from last committed offset --- .../org/apache/camel/component/kafka/KafkaFetchRecords.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index edf39671436..079a14615ab 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -41,7 +41,9 @@ import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; @@ -380,6 +382,14 @@ public class KafkaFetchRecords implements Runnable { break; case RESUME_REQUESTED: LOG.info("Resuming the consumer as a response to a resume request"); + if (consumer.committed(this.consumer.assignment()) != null) { + consumer.committed(this.consumer.assignment()).forEach((k, v) -> { + final TopicPartition tp = (TopicPartition) k; + LOG.info("Resuming from the offset {} for the topic {} with partition {}", + ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition()); + consumer.seek(tp, ((OffsetAndMetadata) v).offset()); + }); + } consumer.resume(consumer.assignment()); state = State.RUNNING; break;