This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 073aa49  CAMEL-17244: avoid creating multiple consumers if already 
connected
073aa49 is described below

commit 073aa4993086891fcd73421b11106e9a35945de2
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Nov 29 10:59:11 2021 +0100

    CAMEL-17244: avoid creating multiple consumers if already connected
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2cc7e59..ec51f5d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -62,7 +62,8 @@ class KafkaFetchRecords implements Runnable {
     private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = 
new ConcurrentLinkedQueue<>();
 
     private boolean retry = true;
-    private boolean reconnect; // must be false at init
+    private boolean reconnect; // must be false at init (this is the policy 
whether to reconnect)
+    private boolean connected; // this is the state (connected or not)
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy 
pollExceptionStrategy,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
@@ -84,17 +85,21 @@ class KafkaFetchRecords implements Runnable {
 
         do {
             try {
-                createConsumer();
+                if (!isConnected()) {
+                    createConsumer();
 
-                initializeConsumer();
+                    initializeConsumer();
+                    setConnected(true);
+                }
             } catch (Exception e) {
+                setConnected(false);
                 // ensure this is logged so users can see the problem
                 LOG.warn("Error creating 
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
                 continue;
             }
 
             startPolling();
-        } while (isRetrying() || isReconnecting());
+        } while (isRetrying() || isReconnect());
 
         LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: 
{}", threadId, topicName);
         safeUnsubscribe();
@@ -125,7 +130,7 @@ class KafkaFetchRecords implements Runnable {
         subscribe();
 
         // set reconnect to false as the connection and resume is done at this 
point
-        setReconnect(false);
+        setConnected(false);
 
         // set retry to true to continue polling
         setRetry(true);
@@ -161,7 +166,7 @@ class KafkaFetchRecords implements Runnable {
             KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-            while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnecting()) {
+            while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnect()) {
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
 
                 processAsyncCommits();
@@ -169,7 +174,7 @@ class KafkaFetchRecords implements Runnable {
                 partitionLastOffset = processPolledRecords(allRecords, 
kafkaRecordProcessor);
             }
 
-            if (!isReconnecting()) {
+            if (!isReconnect()) {
                 LOG.debug("Not reconnecting, check whether to auto-commit or 
not ...");
                 commit();
             }
@@ -259,7 +264,7 @@ class KafkaFetchRecords implements Runnable {
         LOG.warn("Requesting the consumer to stop based on polling exception 
strategy");
 
         setRetry(false);
-        setReconnect(false);
+        setConnected(false);
     }
 
     private void handlePollDiscard(long partitionLastOffset) {
@@ -283,6 +288,7 @@ class KafkaFetchRecords implements Runnable {
 
         // re-connect so the consumer can try the same message again
         setReconnect(true);
+        setConnected(false);
 
         // to close the current consumer
         setRetry(false);
@@ -339,6 +345,7 @@ class KafkaFetchRecords implements Runnable {
             LOG.debug("We hit an error ... setting flags to force reconnect");
             // force re-connect
             setReconnect(true);
+            setConnected(false);
             setRetry(false); // to close the current consumer
         }
 
@@ -430,7 +437,7 @@ class KafkaFetchRecords implements Runnable {
         retry = value;
     }
 
-    private boolean isReconnecting() {
+    private boolean isReconnect() {
         return reconnect;
     }
 
@@ -490,4 +497,11 @@ class KafkaFetchRecords implements Runnable {
                 && 
kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable();
     }
 
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public void setConnected(boolean connected) {
+        this.connected = connected;
+    }
 }

Reply via email to