davsclaus opened a new pull request, #23805:
URL: https://github.com/apache/camel/pull/23805

   ## Summary
   
   _Claude Code on behalf of Claus Ibsen_
   
   Fixes the Pausable EIP with Kafka consumer losing messages when the consumer 
is paused and later resumed. The root cause was that 
`KafkaConsumerListener.afterConsume()` never called `consumer.pause()` or 
seeked back to committed offsets, so `consumer.poll()` kept advancing offsets 
for records that were never processed.
   
   - Rewrite `afterConsume()` to always evaluate the resume predicate, call 
`consumer.pause()` + `seekConsumer()` when pausing, and `consumer.resume()` 
when resuming
   - Remove the `paused` boolean flag that prevented proper predicate evaluation
   - Extract `seekConsumer()` helper shared between `afterConsume` and 
`afterProcess`
   - Use Kafka's native `consumer.pause()` so `poll()` returns empty records 
during pause (no `Thread.sleep` needed)
   
   Prior partial fix (`869e870952c2`) only added a `paused` flag without 
addressing the offset advancement. A full fix existed on the unmerged 
`origin/kafka-pause` branch — this PR adapts that fix to current `main` with 
the cleaner `consumer.pause()` approach.
   
   ## Test plan
   
   - [ ] Existing `KafkaPausableConsumerIT` passes — verifies all messages are 
received after pause/resume
   - [ ] Existing `KafkaPausableConsumerCircuitBreakerIT` passes
   - [ ] Verify no messages are lost when `autoCommitEnable=true` during 
pause/resume cycle
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to