orpiske commented on PR #7518:
URL: https://github.com/apache/camel/pull/7518#issuecomment-1133554526

   > > @orpiske the async commit process seems to be broken. The commit() 
function in DefaultKafkaManualAsyncCommit calls a forceCommit() function which 
is synchronous. And when fixing it using the commit() function (async with the 
manager), the test throw a ConcurrentModificationException(). We need to 
reintroduce a ConcurrentQueue storing commits which should be process by the 
main kafka consumer polling loop and thread.
   > 
   > Please, can you please open a ticket for this or send a PR with your 
proposed changes?
   > 
   > The important requirement is that the concurrent queue must be isolated to 
only the async-related code. It should not be part of the other code (ie.: the 
concerns of the async manual commit code should be separated) nor should be 
present in any way in the KafkaConsumer, KafkaFetchRecords or Kafka*Processor 
code. This is important so that we can continue cleaning up and reducing the 
maintenance effort for this component.
   
   Alternatively, on your scenario, can you also please test if changing the 
call from `forceCommit` to `recordOffset` leads to the correct behavior? 
   
   I believe the offset will eventually be [committed by the 
processor](https://github.com/apache/camel/blob/c2f6d79bc65d1db4e142bd082086c2c67edc78d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java#L91)
 after successful processing.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to