This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch 14980
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c123f7c98a038838f4ef9b26a9ddf7193783d80e
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Mar 20 15:37:04 2021 +0100

    CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable 
exceptions. Other severe exceptions should be propagated.
---
 .../apache/camel/catalog/docs/kafka-component.adoc |  3 +-
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++++
 .../org/apache/camel/component/kafka/kafka.json    |  1 +
 .../camel-kafka/src/main/docs/kafka-component.adoc |  3 +-
 ...ultKafkaConsumerReconnectExceptionStrategy.java | 34 +++++++++++++++++++++
 .../camel/component/kafka/KafkaComponent.java      | 15 ++++++++++
 .../camel/component/kafka/KafkaConsumer.java       | 35 +++++++++++++++-------
 .../KafkaConsumerReconnectExceptionStrategy.java   | 34 +++++++++++++++++++++
 .../dsl/KafkaComponentBuilderFactory.java          | 18 +++++++++++
 .../modules/ROOT/pages/kafka-component.adoc        |  3 +-
 10 files changed, 138 insertions(+), 14 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index cfaf481..02f20fb 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -84,6 +84,7 @@ The Kafka component supports 99 options, which are listed 
below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro 
reader for use with the Confluent Platform schema registry and the 
io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only 
available in the Confluent Platform (not standard Apache Kafka) | false | 
boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic number of topics matching 
the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that 
implements the Deserializer interface. | 
org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom 
strategy with the consumer to control how to handle exceptions thrown from the 
Kafka broker while pooling messages. |  | 
KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating 
KafkaManualCommit instances. This allows to plugin a custom factory to create 
custom KafkaManualCommit instances in case special logic is needed when doing 
manual commits that deviates from the default implementation that comes out of 
the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can 
use to buffer records waiting to be sent to the server. If records are sent 
faster than they can be delivered to the server the producer will either block 
or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use, but is not a hard bound since not all memory the 
producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the 
compression codec for all data generated by this producer. Valid values are 
none, gzip and snappy. There are 4 enums and the value can be one of: none, 
gzip, snappy, lz4 | none | String
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index e44e7a1..70a67bd 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -86,6 +86,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "interceptorClasses": 
getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, 
java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": 
target.setKafkaConsumerReconnectExceptionStrategy(property(camelContext, 
org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy.class, 
value)); return true;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); 
return true;
         case "kerberosbeforereloginmintime":
@@ -290,6 +292,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "interceptorClasses": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": return 
org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy.class;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
@@ -490,6 +494,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "interceptorClasses": return 
getOrCreateConfiguration(target).getInterceptorClasses();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": return 
target.getKafkaConsumerReconnectExceptionStrategy();
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return 
target.getKafkaManualCommitFactory();
         case "kerberosbeforereloginmintime":
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index a23b6e7..c48fbca 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -58,6 +58,7 @@
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value th [...]
+    "kafkaConsumerReconnectExceptionStrategy": { "kind": "property", 
"displayName": "Kafka Consumer Reconnect Exception Strategy", "group": 
"consumer (advanced)", "label": "consumer,advanced", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
use a custom strategy with the consumer to control how to handle exceptions 
thrown from the Kafka [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": 
false, "autowired": false, "secret": false, "description": "Factory to use for 
creating KafkaManualCommit instances. This allows to plugin a custom factory to 
create custom KafkaManualCommit instances in ca [...]
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify the [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cfaf481..02f20fb 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -84,6 +84,7 @@ The Kafka component supports 99 options, which are listed 
below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro 
reader for use with the Confluent Platform schema registry and the 
io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only 
available in the Confluent Platform (not standard Apache Kafka) | false | 
boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic number of topics matching 
the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that 
implements the Deserializer interface. | 
org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom 
strategy with the consumer to control how to handle exceptions thrown from the 
Kafka broker while pooling messages. |  | 
KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating 
KafkaManualCommit instances. This allows to plugin a custom factory to create 
custom KafkaManualCommit instances in case special logic is needed when doing 
manual commits that deviates from the default implementation that comes out of 
the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can 
use to buffer records waiting to be sent to the server. If records are sent 
faster than they can be delivered to the server the producer will either block 
or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use, but is not a hard bound since not all memory the 
producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the 
compression codec for all data generated by this producer. Valid values are 
none, gzip and snappy. There are 4 enums and the value can be one of: none, 
gzip, snappy, lz4 | none | String
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java
new file mode 100644
index 0000000..65235b2
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+
+public class DefaultKafkaConsumerReconnectExceptionStrategy implements 
KafkaConsumerReconnectExceptionStrategy {
+
+    @Override
+    public boolean reconnect(Exception exception) {
+        // only reconnect exceptions that indicates its recoverable or if some 
external thread is waking up the kafka consumer
+        if (exception instanceof RetriableException || exception instanceof 
WakeupException) {
+            return true;
+        }
+
+        // cannot recover so let Camel exception handler deal with it
+        return false;
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 205f7a6..a25c857 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -37,6 +37,9 @@ public class KafkaComponent extends DefaultComponent 
implements SSLContextParame
     private KafkaManualCommitFactory kafkaManualCommitFactory = new 
DefaultKafkaManualCommitFactory();
     @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory = new 
DefaultKafkaClientFactory();
+    @Metadata(label = "consumer,advanced")
+    private KafkaConsumerReconnectExceptionStrategy 
kafkaConsumerReconnectExceptionStrategy
+            = new DefaultKafkaConsumerReconnectExceptionStrategy();
 
     public KafkaComponent() {
     }
@@ -125,4 +128,16 @@ public class KafkaComponent extends DefaultComponent 
implements SSLContextParame
         this.kafkaClientFactory = kafkaClientFactory;
     }
 
+    public KafkaConsumerReconnectExceptionStrategy 
getKafkaConsumerReconnectExceptionStrategy() {
+        return kafkaConsumerReconnectExceptionStrategy;
+    }
+
+    /**
+     * To use a custom strategy with the consumer to control how to handle 
exceptions thrown from the Kafka broker while
+     * pooling messages.
+     */
+    public void setKafkaConsumerReconnectExceptionStrategy(
+            KafkaConsumerReconnectExceptionStrategy 
kafkaConsumerReconnectExceptionStrategy) {
+        this.kafkaConsumerReconnectExceptionStrategy = 
kafkaConsumerReconnectExceptionStrategy;
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index b6a146d..c2bc7b3 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -47,7 +47,6 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.header.Header;
@@ -417,22 +416,36 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.info("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
-            } catch (KafkaException e) {
-                // some kind of error in kafka, it may happen during
-                // unsubscribing or during normal processing
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Exception caught while polling " + threadId + " 
from kafka topic " + topicName
+                              + ". Deciding what to do.",
+                            e);
+                }
                 if (unsubscribing) {
+                    // some kind of error in kafka, it may happen during 
unsubscribing
                     getExceptionHandler().handleException("Error unsubscribing 
" + threadId + " from kafka topic " + topicName,
                             e);
                 } else {
-                    LOG.debug("KafkaException consuming {} from topic {} 
causedby {}. Will attempt to re-connect on next run",
-                            threadId, topicName, e.getMessage());
-                    reConnect = true;
+                    boolean retry = 
getEndpoint().getComponent().getKafkaConsumerReconnectExceptionStrategy().reconnect(e);
+                    if (retry) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "KafkaException consuming {} from topic {} 
causedby {}. Will attempt to re-connect on next run",
+                                    threadId, topicName, e.getMessage());
+                        }
+                        reConnect = true;
+                    } else {
+                        getExceptionHandler().handleException("Error consuming 
" + threadId + " from kafka topic " + topicName,
+                                e);
+                    }
                 }
-            } catch (Exception e) {
-                getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
             } finally {
-                LOG.debug("Closing {}", threadId);
-                IOHelper.close(consumer);
+                // only close if not re-connecting
+                if (!reConnect) {
+                    LOG.debug("Closing {}", threadId);
+                    IOHelper.close(consumer);
+                }
             }
 
             return reConnect;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java
new file mode 100644
index 0000000..676e07d
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+/**
+ * Strategy to decide when a Kafka exception was thrown during pooling, how to 
handle this, either be re-connecting with
+ * a new session and retry polling again, or let Camel {@link 
org.apache.camel.spi.ExceptionHandler} handle the
+ * exception.
+ */
+public interface KafkaConsumerReconnectExceptionStrategy {
+
+    /**
+     * Whether to reconnect or let Camel {@link 
org.apache.camel.spi.ExceptionHandler} handle the exception.
+     *
+     * @param  exception the caused exception which typically would be a 
{@link org.apache.kafka.common.KafkaException}
+     * @return           true to re-connect, false to let Camel {@link 
org.apache.camel.spi.ExceptionHandler} handle the
+     *                   exception
+     */
+    boolean reconnect(Exception exception);
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 9a8c878..770335a 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -706,6 +706,23 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * To use a custom strategy with the consumer to control how to handle
+         * exceptions thrown from the Kafka broker while pooling messages.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy&lt;/code&gt;
 type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param kafkaConsumerReconnectExceptionStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder kafkaConsumerReconnectExceptionStrategy(
+                
org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy 
kafkaConsumerReconnectExceptionStrategy) {
+            doSetProperty("kafkaConsumerReconnectExceptionStrategy", 
kafkaConsumerReconnectExceptionStrategy);
+            return this;
+        }
+        /**
          * Factory to use for creating KafkaManualCommit instances. This allows
          * to plugin a custom factory to create custom KafkaManualCommit
          * instances in case special logic is needed when doing manual commits
@@ -1930,6 +1947,7 @@ public interface KafkaComponentBuilderFactory {
             case "specificAvroReader": 
getOrCreateConfiguration((KafkaComponent) 
component).setSpecificAvroReader((boolean) value); return true;
             case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) 
component).setTopicIsPattern((boolean) value); return true;
             case "valueDeserializer": 
getOrCreateConfiguration((KafkaComponent) 
component).setValueDeserializer((java.lang.String) value); return true;
+            case "kafkaConsumerReconnectExceptionStrategy": ((KafkaComponent) 
component).setKafkaConsumerReconnectExceptionStrategy((org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy)
 value); return true;
             case "kafkaManualCommitFactory": ((KafkaComponent) 
component).setKafkaManualCommitFactory((org.apache.camel.component.kafka.KafkaManualCommitFactory)
 value); return true;
             case "bufferMemorySize": getOrCreateConfiguration((KafkaComponent) 
component).setBufferMemorySize((java.lang.Integer) value); return true;
             case "compressionCodec": getOrCreateConfiguration((KafkaComponent) 
component).setCompressionCodec((java.lang.String) value); return true;
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc 
b/docs/components/modules/ROOT/pages/kafka-component.adoc
index c74f74c..dd5d130 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -43,7 +43,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -86,6 +86,7 @@ The Kafka component supports 99 options, which are listed 
below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro 
reader for use with the Confluent Platform schema registry and the 
io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only 
available in the Confluent Platform (not standard Apache Kafka) | false | 
boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic number of topics matching 
the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that 
implements the Deserializer interface. | 
org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom 
strategy with the consumer to control how to handle exceptions thrown from the 
Kafka broker while pooling messages. |  | 
KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating 
KafkaManualCommit instances. This allows to plugin a custom factory to create 
custom KafkaManualCommit instances in case special logic is needed when doing 
manual commits that deviates from the default implementation that comes out of 
the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can 
use to buffer records waiting to be sent to the server. If records are sent 
faster than they can be delivered to the server the producer will either block 
or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use, but is not a hard bound since not all memory the 
producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the 
compression codec for all data generated by this producer. Valid values are 
none, gzip and snappy. There are 4 enums and the value can be one of: none, 
gzip, snappy, lz4 | none | String

Reply via email to