orpiske commented on PR #7518: URL: https://github.com/apache/camel/pull/7518#issuecomment-1133554526
> > @orpiske the async commit process seems to be broken. The commit() function in DefaultKafkaManualAsyncCommit calls a forceCommit() function which is synchronous. And when fixing it using the commit() function (async with the manager), the test throw a ConcurrentModificationException(). We need to reintroduce a ConcurrentQueue storing commits which should be process by the main kafka consumer polling loop and thread. > > Please, can you please open a ticket for this or send a PR with your proposed changes? > > The important requirement is that the concurrent queue must be isolated to only the async-related code. It should not be part of the other code (ie.: the concerns of the async manual commit code should be separated) nor should be present in any way in the KafkaConsumer, KafkaFetchRecords or Kafka*Processor code. This is important so that we can continue cleaning up and reducing the maintenance effort for this component. Alternatively, on your scenario, can you also please test if changing the call from `forceCommit` to `recordOffset` leads to the correct behavior? I believe the offset will eventually be [committed by the processor](https://github.com/apache/camel/blob/c2f6d79bc65d1db4e142bd082086c2c67edc78d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java#L91) after successful processing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org