This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 34442f1 CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925) 34442f1 is described below commit 34442f13e7c8e5367643eb47a1f6034de57e45fa Author: jenskordowski <10864787+jenskordow...@users.noreply.github.com> AuthorDate: Wed Aug 4 14:56:53 2021 +0200 CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925) Co-authored-by: Jens Kordowski <jens.kordow...@sap.com> --- .../main/java/org/apache/camel/component/kafka/KafkaConsumer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 05a734b..331ba9e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -240,6 +240,8 @@ public class KafkaConsumer extends DefaultConsumer { first = false; if (!isRunAllowed() || isStoppingOrStopped() || isSuspendingOrSuspended()) { + LOG.debug("Closing consumer {}", threadId); + IOHelper.close(consumer); return; } @@ -277,6 +279,8 @@ public class KafkaConsumer extends DefaultConsumer { doReconnectRun(); // set reconnect to false as its done now reconnect.set(false); + // set retry to true to continue polling + retry.set(true); } // polling doPollRun(retry, reconnect); @@ -502,6 +506,7 @@ public class KafkaConsumer extends DefaultConsumer { } // re-connect so the consumer can try the same message again reconnect.set(true); + retry.set(false); // to close the current consumer } else if (PollOnError.ERROR_HANDLER == onError) { // use bridge error handler to route with exception bridge.handleException(e);