This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 711ab31 Camel 14935 kafka exception (#3877) 711ab31 is described below commit 711ab31c29615deffe5b7fe3dc4ba1235cbd9979 Author: Darius <dariuscoo...@gmail.com> AuthorDate: Wed Jun 3 00:14:33 2020 -0400 Camel 14935 kafka exception (#3877) * CAMEL-14935: Fix issue with failing commits on rebalance * Log the exception on partion revoked, and rethrow * Fixed, based on pull request feedback --- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index df469b0..30f33fa 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -460,8 +460,15 @@ public class KafkaConsumer extends DefaultConsumer { offset = -1L; } LOG.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset); - commitOffset(offsetRepository, partition, offset, true); - lastProcessedOffset.remove(offsetKey); + try { + commitOffset(offsetRepository, partition, offset, true); + } catch (java.lang.Exception e) { + LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset); + throw e; + } finally { + lastProcessedOffset.remove(offsetKey); + } + } }