This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 2213f90 (chores) camel-kafka: fixes and cleanups 2213f90 is described below commit 2213f90d42cf0a12092709f57fbae62834aa3a84 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Sep 16 18:39:06 2021 +0200 (chores) camel-kafka: fixes and cleanups - consolidate common code - adjust the hash map containing the last processed offset to not use a concurrent collection (the instance is only accessed within the local consumer thread and should be safe using a non-concurrent collection) --- .../main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 4 ++-- .../camel/component/kafka/consumer/support/KafkaRecordProcessor.java | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 6394770..086432e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -18,12 +18,12 @@ package org.apache.camel.component.kafka; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -56,7 +56,7 @@ class KafkaFetchRecords implements Runnable { private final Pattern topicPattern; private final String threadId; private final Properties kafkaProps; - private final Map<String, Long> lastProcessedOffset = new ConcurrentHashMap<>(); + private final Map<String, Long> lastProcessedOffset = new HashMap<>(); private final PollExceptionStrategy pollExceptionStrategy; private final BridgeExceptionHandlerToErrorHandler bridge; private final ReentrantLock lock = new ReentrantLock(); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index e48f78c..f1a70b0 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -132,6 +132,7 @@ public class KafkaRecordProcessor { // if not auto commit then we have additional information on the exchange if (!autoCommitEnabled) { message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordHasNext); + message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); } if (configuration.isAllowManualCommit()) { @@ -141,9 +142,6 @@ public class KafkaRecordProcessor { KafkaManualCommit manual = manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId, offsetRepository, partition, record.offset(), configuration.getCommitTimeoutMs()); message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); - } - // if commit management is on user side give additional info for the end of poll loop - if (!autoCommitEnabled || configuration.isAllowManualCommit()) { message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); }