arjun180 opened a new issue #1245: URL: https://github.com/apache/camel-kafka-connector/issues/1245
I've been working on enabling Idempotency on my AWSs3-Kafka source connector. There were some issues I detailed here : https://github.com/apache/camel-kafka-connector/issues/1236. I worked around some of them, but there were still some really strange I'm noticing in my consumer. This is what my connector yaml file looks like ``` apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: s3-source-connector namespace :my-kakfa labels: strimzi.io/cluster: my-kafka-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector tasksMax: 1 config: topics: my-test-topic camel.source.path.bucketNameOrArn: my-kakfa-connect #camel.source.endpoint.useDefaultCredentials Provider: true camel.component.aws2-s3.useDefaultCredentialsProvider: true camel.component.aws2-s3.deleteAfterRead: false camel.source.endpoint.region: <region> camel.source.endpoint.prefix: 'my-connect/' camel.idempotency.enabled: true camel.idempotency.repository.type: memory camel.idempotency.expression.type: body camel.idempotency.kafka.topic : my.idempotency.topic camel.idempotency.kafka.max.cache.size: 30000 camel.idempotency.kafka.poll.duration.ms: 150 ``` My connector spins up and there's no problem on that end. But consumer shows the following strange behavior : - When I first start consuming, the messages stream in fine. I have `auto.offset.reset:latest`. - When I add more files to the s3 bucket, the consumer does not capture them and the Kafka connector logs keep showing ``` flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [SourceTaskOffsetCommitter-1] ``` I then try some other things : - I delete and restart the connector, the messages start flowing in. - I then try to delete and restart the connector again (without new data being added to the s3 bucket), and the same set of messages start flowing back in. This is strange because already committed messages should not be consumed again. I keep the group id the same and keep track of the offset for each partition. I also commit every time a message is read. I have a few questions: - Why does the Kafka connector not capture the latest messages added to the s3 bucket? - Why does the behavior change when I delete and restart the connector? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org