This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 803f0d6  Fixed the Defect manual commit not working (#2306)
803f0d6 is described below

commit 803f0d630bf82a6918224bb4b8cfe3bb47b94da1
Author: mchandwani <mukesh.chandw...@gmail.com>
AuthorDate: Thu Apr 26 08:34:00 2018 -0400

    Fixed the Defect manual commit not working (#2306)
    
    * Defect Fix for manual commit. partitionLastOffset is always -1. Manual
    commit is not successful because consumer.comitSync is never called
    
    * Removing changes
---
 .../org/apache/camel/component/kafka/DefaultKafkaManualCommit.java     | 2 ++
 .../src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java  | 3 ++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index 1965999..3dc991f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -50,6 +50,8 @@ public class DefaultKafkaManualCommit implements 
KafkaManualCommit {
     public void commitSync() {
         commitOffset(offsetRepository, partition, partitionLastOffset);
     }
+    
+  
 
     protected void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset) {
         if (partitionLastOffset != -1) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index a166c11..05111f2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -287,8 +287,9 @@ public class KafkaConsumer extends DefaultConsumer {
                                 if 
(endpoint.getConfiguration().isAllowManualCommit()) {
                                     // allow Camel users to access the Kafka 
consumer API to be able to do for example manual commits
                                     KafkaManualCommit manual = 
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, 
consumer, topicName, threadId,
-                                        offsetRepository, partition, 
partitionLastOffset);
+                                        offsetRepository, partition, 
record.offset());
                                     
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
+                                    
                                 }
 
                                 try {

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to