This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 803f0d6 Fixed the Defect manual commit not working (#2306) 803f0d6 is described below commit 803f0d630bf82a6918224bb4b8cfe3bb47b94da1 Author: mchandwani <mukesh.chandw...@gmail.com> AuthorDate: Thu Apr 26 08:34:00 2018 -0400 Fixed the Defect manual commit not working (#2306) * Defect Fix for manual commit. partitionLastOffset is always -1. Manual commit is not successful because consumer.comitSync is never called * Removing changes --- .../org/apache/camel/component/kafka/DefaultKafkaManualCommit.java | 2 ++ .../src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java index 1965999..3dc991f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java @@ -50,6 +50,8 @@ public class DefaultKafkaManualCommit implements KafkaManualCommit { public void commitSync() { commitOffset(offsetRepository, partition, partitionLastOffset); } + + protected void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset) { if (partitionLastOffset != -1) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index a166c11..05111f2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -287,8 +287,9 @@ public class KafkaConsumer extends DefaultConsumer { if (endpoint.getConfiguration().isAllowManualCommit()) { // allow Camel users to access the Kafka consumer API to be able to do for example manual commits KafkaManualCommit manual = endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, partitionLastOffset); + offsetRepository, partition, record.offset()); exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); + } try { -- To stop receiving notification emails like this one, please contact davscl...@apache.org.