This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2213f90  (chores) camel-kafka: fixes and cleanups
2213f90 is described below

commit 2213f90d42cf0a12092709f57fbae62834aa3a84
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Sep 16 18:39:06 2021 +0200

    (chores) camel-kafka: fixes and cleanups
    
    - consolidate common code
    - adjust the hash map containing the last processed offset to not use a
      concurrent collection (the instance is only accessed within the local
    consumer thread and should be safe using a non-concurrent collection)
---
 .../main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 4 ++--
 .../camel/component/kafka/consumer/support/KafkaRecordProcessor.java  | 4 +---
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 6394770..086432e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -18,12 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
@@ -56,7 +56,7 @@ class KafkaFetchRecords implements Runnable {
     private final Pattern topicPattern;
     private final String threadId;
     private final Properties kafkaProps;
-    private final Map<String, Long> lastProcessedOffset = new 
ConcurrentHashMap<>();
+    private final Map<String, Long> lastProcessedOffset = new HashMap<>();
     private final PollExceptionStrategy pollExceptionStrategy;
     private final BridgeExceptionHandlerToErrorHandler bridge;
     private final ReentrantLock lock = new ReentrantLock();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index e48f78c..f1a70b0 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -132,6 +132,7 @@ public class KafkaRecordProcessor {
         // if not auto commit then we have additional information on the 
exchange
         if (!autoCommitEnabled) {
             message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordHasNext);
+            message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext 
&& !partitionHasNext);
         }
 
         if (configuration.isAllowManualCommit()) {
@@ -141,9 +142,6 @@ public class KafkaRecordProcessor {
             KafkaManualCommit manual = 
manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId,
                     offsetRepository, partition, record.offset(), 
configuration.getCommitTimeoutMs());
             message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-        }
-        // if commit management is on user side give additional info for the 
end of poll loop
-        if (!autoCommitEnabled || configuration.isAllowManualCommit()) {
             message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext 
&& !partitionHasNext);
         }
 

Reply via email to