arjun180 opened a new issue #1236: URL: https://github.com/apache/camel-kafka-connector/issues/1236
I've been working on setting up a Kafka S3 source connector, where data needs to flow from an s3 bucket to a Kafka topic. I need to make sure that the data is retained in the s3 bucket. So I set this condition in my Kafka connector `camel.component.aws2-s3.deleteAfterRead: false` Based on this page https://camel.apache.org/blog/2020/12/CKC-idempotency-070/ , my Kafka connector looks like this : ``` apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: s3-source-connector namespace : my-kafka labels: strimzi.io/cluster: my-dev-kafka-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector tasksMax: 1 config: topics: test-topic camel.source.path.bucketNameOrArn: my-kakfa-connect #camel.source.endpoint.useDefaultCredentials Provider: true camel.component.aws2-s3.useDefaultCredentialsProvider: true camel.component.aws2-s3.deleteAfterRead: false camel.source.endpoint.region: <region> camel.source.endpoint.prefix: 'source_folder/' camel.idempotency.enabled: true camel.idempotency.repository.type: kafka camel.idempotency.expression.type: body camel.idempotency.kafka.topic : my.idempotency.topic camel.idempotency.kafka.max.cache.size: 1500 camel.idempotency.kafka.poll.duration.ms: 150 ``` The error I get is this : ``` 2021-07-21 04:59:46,527 WARN Error processing exchange. Exchange[AB7DE03FAF78DF0-00000000000630CC]. Caused by: [java.lang.IllegalStateException - Queue full] (org.apache.camel.component.aws2.s3.AWS2S3Consumer) [Camel (camel-35) thread #67 - aws2-s3://my-kakfa-connect] java.lang.IllegalStateException: Queue full at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98) at org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251) at org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149) at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197) at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318) at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62) at org.apache.camel.processor.Pipeline.process(Pipeline.java:167) at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388) at org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289) at org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164) at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` I couldn't find too much information about this, especially because the Seda queue is mentioned. I was wondering if I could get any help on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org