arjun180 opened a new issue #1245:
URL: https://github.com/apache/camel-kafka-connector/issues/1245


   I've been working on enabling Idempotency on my AWSs3-Kafka source 
connector. There were some issues I detailed here : 
https://github.com/apache/camel-kafka-connector/issues/1236. I worked around 
some of them, but there were still some really strange I'm noticing in my 
consumer. This is what my connector yaml file looks like 
   
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-source-connector
     namespace :my-kakfa
     labels:
       strimzi.io/cluster: my-kafka-connect-cluster
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
     tasksMax: 1
     config:
       topics: my-test-topic
       camel.source.path.bucketNameOrArn: my-kakfa-connect
       #camel.source.endpoint.useDefaultCredentials Provider: true
       camel.component.aws2-s3.useDefaultCredentialsProvider: true
       camel.component.aws2-s3.deleteAfterRead: false
       camel.source.endpoint.region: <region>
       camel.source.endpoint.prefix: 'my-connect/'
       camel.idempotency.enabled: true
       camel.idempotency.repository.type: memory
       camel.idempotency.expression.type: body
       camel.idempotency.kafka.topic : my.idempotency.topic
       camel.idempotency.kafka.max.cache.size: 30000
       camel.idempotency.kafka.poll.duration.ms: 150
   ```
   My connector spins up and there's no problem on that end. But consumer shows 
the following strange behavior : 
   
   - When I first start consuming, the messages stream in fine. I have 
`auto.offset.reset:latest`.
   - When I add more files to the s3 bucket, the consumer does not capture them 
and the Kafka connector logs keep showing 
   ```
   flushing 0 outstanding messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask) 
[SourceTaskOffsetCommitter-1]
   ```
   I then try some other things : 
   
   - I delete and restart the connector, the messages start flowing in.
   - I then try to delete and restart the connector again (without new data 
being added to the s3 bucket), and the same set of messages start flowing back 
in. This is strange because already committed messages should not be consumed 
again. 
   
   I keep the group id the same and keep track of the offset for each 
partition. I also commit every time a message is read. I have a few questions: 
   
   - Why does the Kafka connector not capture the latest messages added to the 
s3 bucket?
   - Why does the behavior change when I delete and restart the connector?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to