This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ec184a6  Rename parameter for latest camel-kafka fix in PR #2306.
ec184a6 is described below

commit ec184a60c32233a76ffc81231ae76328c545cc98
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Apr 26 14:44:17 2018 +0200

    Rename parameter for latest camel-kafka fix in PR #2306.
---
 .../component/kafka/DefaultKafkaManualCommit.java  | 24 ++++++++++------------
 .../kafka/DefaultKafkaManualCommitFactory.java     |  4 ++--
 .../component/kafka/KafkaManualCommitFactory.java  |  2 +-
 3 files changed, 14 insertions(+), 16 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index 3dc991f..ba1cc15 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -34,32 +34,30 @@ public class DefaultKafkaManualCommit implements 
KafkaManualCommit {
     private final String threadId;
     private final StateRepository<String, String> offsetRepository;
     private final TopicPartition partition;
-    private final long partitionLastOffset;
+    private final long recordOffset;
 
     public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, 
String threadId,
-                                    StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset) {
+                                    StateRepository<String, String> 
offsetRepository, TopicPartition partition, long recordOffset) {
         this.consumer = consumer;
         this.topicName = topicName;
         this.threadId = threadId;
         this.offsetRepository = offsetRepository;
         this.partition = partition;
-        this.partitionLastOffset = partitionLastOffset;
+        this.recordOffset = recordOffset;
     }
 
     @Override
     public void commitSync() {
-        commitOffset(offsetRepository, partition, partitionLastOffset);
+        commitOffset(offsetRepository, partition, recordOffset);
     }
-    
-  
 
-    protected void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset) {
-        if (partitionLastOffset != -1) {
+    protected void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long recordOffset) {
+        if (recordOffset != -1) {
             if (offsetRepository != null) {
-                offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
+                offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(recordOffset));
             } else {
-                LOG.debug("CommitSync {} from topic {} with offset: {}", 
threadId, topicName, partitionLastOffset);
-                consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)));
+                LOG.debug("CommitSync {} from topic {} with offset: {}", 
threadId, topicName, recordOffset);
+                consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(recordOffset + 1)));
             }
         }
     }
@@ -92,8 +90,8 @@ public class DefaultKafkaManualCommit implements 
KafkaManualCommit {
         return partition;
     }
 
-    public long getPartitionLastOffset() {
-        return partitionLastOffset;
+    public long getRecordOffset() {
+        return recordOffset;
     }
 
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
index 3760f90..ba60cc9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
@@ -25,7 +25,7 @@ public class DefaultKafkaManualCommitFactory implements 
KafkaManualCommitFactory
 
     @Override
     public KafkaManualCommit newInstance(Exchange exchange, KafkaConsumer 
consumer, String topicName,
-                                         String threadId, 
StateRepository<String, String> offsetRepository, TopicPartition partition, 
long partitionLastOffset) {
-        return new DefaultKafkaManualCommit(consumer, topicName, threadId, 
offsetRepository, partition, partitionLastOffset);
+                                         String threadId, 
StateRepository<String, String> offsetRepository, TopicPartition partition, 
long recordOffset) {
+        return new DefaultKafkaManualCommit(consumer, topicName, threadId, 
offsetRepository, partition, recordOffset);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
index e900c10..26cdf25 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
@@ -30,5 +30,5 @@ public interface KafkaManualCommitFactory {
      * Creates a new instance
      */
     KafkaManualCommit newInstance(Exchange exchange, KafkaConsumer consumer, 
String topicName, String threadId,
-                                  StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset);
+                                  StateRepository<String, String> 
offsetRepository, TopicPartition partition, long recordOffset);
 }

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to