This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 073aa49 CAMEL-17244: avoid creating multiple consumers if already
connected
073aa49 is described below
commit 073aa4993086891fcd73421b11106e9a35945de2
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Nov 29 10:59:11 2021 +0100
CAMEL-17244: avoid creating multiple consumers if already connected
---
.../camel/component/kafka/KafkaFetchRecords.java | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2cc7e59..ec51f5d 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -62,7 +62,8 @@ class KafkaFetchRecords implements Runnable {
private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits =
new ConcurrentLinkedQueue<>();
private boolean retry = true;
- private boolean reconnect; // must be false at init
+ private boolean reconnect; // must be false at init (this is the policy
whether to reconnect)
+ private boolean connected; // this is the state (connected or not)
KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy
pollExceptionStrategy,
BridgeExceptionHandlerToErrorHandler bridge, String
topicName, Pattern topicPattern, String id,
@@ -84,17 +85,21 @@ class KafkaFetchRecords implements Runnable {
do {
try {
- createConsumer();
+ if (!isConnected()) {
+ createConsumer();
- initializeConsumer();
+ initializeConsumer();
+ setConnected(true);
+ }
} catch (Exception e) {
+ setConnected(false);
// ensure this is logged so users can see the problem
LOG.warn("Error creating
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
continue;
}
startPolling();
- } while (isRetrying() || isReconnecting());
+ } while (isRetrying() || isReconnect());
LOG.info("Terminating KafkaConsumer thread: {} receiving from topic:
{}", threadId, topicName);
safeUnsubscribe();
@@ -125,7 +130,7 @@ class KafkaFetchRecords implements Runnable {
subscribe();
// set reconnect to false as the connection and resume is done at this
point
- setReconnect(false);
+ setConnected(false);
// set retry to true to continue polling
setRetry(true);
@@ -161,7 +166,7 @@ class KafkaFetchRecords implements Runnable {
KafkaRecordProcessor kafkaRecordProcessor =
buildKafkaRecordProcessor();
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
- while (isKafkaConsumerRunnable() && isRetrying() &&
!isReconnecting()) {
+ while (isKafkaConsumerRunnable() && isRetrying() &&
!isReconnect()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
processAsyncCommits();
@@ -169,7 +174,7 @@ class KafkaFetchRecords implements Runnable {
partitionLastOffset = processPolledRecords(allRecords,
kafkaRecordProcessor);
}
- if (!isReconnecting()) {
+ if (!isReconnect()) {
LOG.debug("Not reconnecting, check whether to auto-commit or
not ...");
commit();
}
@@ -259,7 +264,7 @@ class KafkaFetchRecords implements Runnable {
LOG.warn("Requesting the consumer to stop based on polling exception
strategy");
setRetry(false);
- setReconnect(false);
+ setConnected(false);
}
private void handlePollDiscard(long partitionLastOffset) {
@@ -283,6 +288,7 @@ class KafkaFetchRecords implements Runnable {
// re-connect so the consumer can try the same message again
setReconnect(true);
+ setConnected(false);
// to close the current consumer
setRetry(false);
@@ -339,6 +345,7 @@ class KafkaFetchRecords implements Runnable {
LOG.debug("We hit an error ... setting flags to force reconnect");
// force re-connect
setReconnect(true);
+ setConnected(false);
setRetry(false); // to close the current consumer
}
@@ -430,7 +437,7 @@ class KafkaFetchRecords implements Runnable {
retry = value;
}
- private boolean isReconnecting() {
+ private boolean isReconnect() {
return reconnect;
}
@@ -490,4 +497,11 @@ class KafkaFetchRecords implements Runnable {
&&
kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable();
}
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
}