This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new ec184a6 Rename parameter for latest camel-kafka fix in PR #2306. ec184a6 is described below commit ec184a60c32233a76ffc81231ae76328c545cc98 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 26 14:44:17 2018 +0200 Rename parameter for latest camel-kafka fix in PR #2306. --- .../component/kafka/DefaultKafkaManualCommit.java | 24 ++++++++++------------ .../kafka/DefaultKafkaManualCommitFactory.java | 4 ++-- .../component/kafka/KafkaManualCommitFactory.java | 2 +- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java index 3dc991f..ba1cc15 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java @@ -34,32 +34,30 @@ public class DefaultKafkaManualCommit implements KafkaManualCommit { private final String threadId; private final StateRepository<String, String> offsetRepository; private final TopicPartition partition; - private final long partitionLastOffset; + private final long recordOffset; public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, String threadId, - StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset) { + StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) { this.consumer = consumer; this.topicName = topicName; this.threadId = threadId; this.offsetRepository = offsetRepository; this.partition = partition; - this.partitionLastOffset = partitionLastOffset; + this.recordOffset = recordOffset; } @Override public void commitSync() { - commitOffset(offsetRepository, partition, partitionLastOffset); + commitOffset(offsetRepository, partition, recordOffset); } - - - protected void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset) { - if (partitionLastOffset != -1) { + protected void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) { + if (recordOffset != -1) { if (offsetRepository != null) { - offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset)); + offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset)); } else { - LOG.debug("CommitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1))); + LOG.debug("CommitSync {} from topic {} with offset: {}", threadId, topicName, recordOffset); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1))); } } } @@ -92,8 +90,8 @@ public class DefaultKafkaManualCommit implements KafkaManualCommit { return partition; } - public long getPartitionLastOffset() { - return partitionLastOffset; + public long getRecordOffset() { + return recordOffset; } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java index 3760f90..ba60cc9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java @@ -25,7 +25,7 @@ public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory @Override public KafkaManualCommit newInstance(Exchange exchange, KafkaConsumer consumer, String topicName, - String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset) { - return new DefaultKafkaManualCommit(consumer, topicName, threadId, offsetRepository, partition, partitionLastOffset); + String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) { + return new DefaultKafkaManualCommit(consumer, topicName, threadId, offsetRepository, partition, recordOffset); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java index e900c10..26cdf25 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java @@ -30,5 +30,5 @@ public interface KafkaManualCommitFactory { * Creates a new instance */ KafkaManualCommit newInstance(Exchange exchange, KafkaConsumer consumer, String topicName, String threadId, - StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset); + StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset); } -- To stop receiving notification emails like this one, please contact davscl...@apache.org.