arjun180 opened a new issue #1236:
URL: https://github.com/apache/camel-kafka-connector/issues/1236


   I've been working on setting up a Kafka S3 source connector, where data 
needs to flow from an s3 bucket to a Kafka topic. I need to make sure that the 
data is retained in the s3 bucket. So I set this condition in my Kafka 
connector `camel.component.aws2-s3.deleteAfterRead: false`
   
   Based on this page 
https://camel.apache.org/blog/2020/12/CKC-idempotency-070/ , my Kafka connector 
looks like this : 
   
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-source-connector
     namespace : my-kafka
     labels:
       strimzi.io/cluster: my-dev-kafka-connect-cluster
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
     tasksMax: 1
     config:
       topics: test-topic
       camel.source.path.bucketNameOrArn: my-kakfa-connect
       #camel.source.endpoint.useDefaultCredentials Provider: true
       camel.component.aws2-s3.useDefaultCredentialsProvider: true
       camel.component.aws2-s3.deleteAfterRead: false
       camel.source.endpoint.region: <region>
       camel.source.endpoint.prefix: 'source_folder/'
       camel.idempotency.enabled: true
       camel.idempotency.repository.type: kafka
       camel.idempotency.expression.type: body
       camel.idempotency.kafka.topic : my.idempotency.topic
       camel.idempotency.kafka.max.cache.size: 1500
       camel.idempotency.kafka.poll.duration.ms: 150
   ```
   
   The error I get is this : 
   ```
   2021-07-21 04:59:46,527 WARN Error processing exchange. 
Exchange[AB7DE03FAF78DF0-00000000000630CC]. Caused by: 
[java.lang.IllegalStateException - Queue full] 
(org.apache.camel.component.aws2.s3.AWS2S3Consumer) [Camel (camel-35) thread 
#67 - aws2-s3://my-kakfa-connect]
   java.lang.IllegalStateException: Queue full
        at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
        at 
org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251)
        at 
org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149)
        at 
org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
        at 
org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
        at 
org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
        at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:167)
        at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388)
        at 
org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289)
        at 
org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164)
        at 
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
        at 
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   
   ```
   
   I couldn't find too much information about this, especially because the Seda 
queue is mentioned. I was wondering if I could get any help on this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to