This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new 90ecc437481 CAMEL-19285: prevent Kafka client from entering an endless loop 90ecc437481 is described below commit 90ecc437481c0425c5fceb9c834cf3f6b1592b25 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Jun 29 13:00:36 2023 +0200 CAMEL-19285: prevent Kafka client from entering an endless loop When an authentication error is thrown by Kafka, this could cause an endless loop in the client. Potentially affecting the Kafka broker due to an excessive connection retries in a short period. This prevents authentication related errors to cause this. --- .../consumer/errorhandler/ReconnectErrorStrategy.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java index 77036a9aa07..6af2781f01c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.errorhandler; import org.apache.camel.component.kafka.KafkaFetchRecords; import org.apache.camel.component.kafka.PollExceptionStrategy; +import org.apache.kafka.common.errors.AuthenticationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +44,19 @@ public class ReconnectErrorStrategy implements PollExceptionStrategy { @Override public void handle(long partitionLastOffset, Exception exception) { - LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy"); + if (exception instanceof AuthenticationException) { + LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect"); + + // disable reconnect: authentication errors are non-recoverable + recordFetcher.setReconnect(false); + recordFetcher.setConnected(false); + } else { + LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy"); - // re-connect so the consumer can try the same message again - recordFetcher.setReconnect(true); - recordFetcher.setConnected(false); + // re-connect so the consumer can try the same message again + recordFetcher.setReconnect(true); + recordFetcher.setConnected(false); + } // to close the current consumer retry = false;