This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 59103a6  Adds config option 'kafkaClientFactory' to camel-kafka (#4918)
59103a6 is described below

commit 59103a66cae683f8d41438d9ace487e5f29a8159
Author: Javier Holguera <javier.holgu...@gmail.com>
AuthorDate: Sun Jan 24 17:23:51 2021 +0000

    Adds config option 'kafkaClientFactory' to camel-kafka (#4918)
---
 .../apache/camel/catalog/docs/kafka-component.adoc |  3 +-
 .../component/kafka/KafkaComponentConfigurer.java  |  6 +++
 .../org/apache/camel/component/kafka/kafka.json    |  1 +
 .../camel-kafka/src/main/docs/kafka-component.adoc |  3 +-
 .../component/kafka/DefaultKafkaClientFactory.java | 34 +++++++++++++++++
 .../camel/component/kafka/KafkaClientFactory.java  | 44 ++++++++++++++++++++++
 .../camel/component/kafka/KafkaComponent.java      | 20 ++++++++++
 .../camel/component/kafka/KafkaConsumer.java       |  2 +-
 .../camel/component/kafka/KafkaProducer.java       |  2 +-
 .../dsl/KafkaComponentBuilderFactory.java          | 23 +++++++++++
 .../modules/ROOT/pages/kafka-component.adoc        |  3 +-
 11 files changed, 136 insertions(+), 5 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index 630520a..85c4147 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 98 options, which are listed below.
+The Kafka component supports 99 options, which are listed below.
 
 
 
@@ -118,6 +118,7 @@ The Kafka component supports 98 options, which are listed 
below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool 
for continue routing Exchange after kafka server has acknowledge the message 
that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker 
pool for continue routing Exchange after kafka server has acknowledge the 
message that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used 
for automatic autowiring options (the option must be marked as autowired) by 
looking up in the registry to find if there is a single instance of matching 
type, which then gets configured on the component. This can be used for 
automatic configuring JDBC data sources, JMS connection factories, AWS Clients, 
etc. | true | boolean
+| *kafkaClientFactory* (advanced) | Factory to use for creating 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances. This allows to 
configure a custom factory to create 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances with logic that 
extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be 
strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema 
registry servers to use. The format is host1:port1,host2:port2. This is known 
as schema.registry.url in the Confluent Platform documentation. This option is 
only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or 
consumers. Producer interceptors have to be classes implementing 
org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors 
have to be classes implementing 
org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use 
Producer interceptor on a consumer it will throw a class cast exception in 
runtime |  | String
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 6ad0aef..1e2481e 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -84,6 +84,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": 
getOrCreateConfiguration(target).setHeartbeatIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "interceptorclasses":
         case "interceptorClasses": 
getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "kafkaclientfactory":
+        case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); 
return true;
         case "kerberosbeforereloginmintime":
@@ -281,6 +283,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return java.lang.Integer.class;
         case "interceptorclasses":
         case "interceptorClasses": return java.lang.String.class;
+        case "kafkaclientfactory":
+        case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
@@ -479,6 +483,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return 
getOrCreateConfiguration(target).getHeartbeatIntervalMs();
         case "interceptorclasses":
         case "interceptorClasses": return 
getOrCreateConfiguration(target).getInterceptorClasses();
+        case "kafkaclientfactory":
+        case "kafkaClientFactory": return target.getKafkaClientFactory();
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return 
target.getKafkaManualCommitFactory();
         case "kerberosbeforereloginmintime":
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 4c3d0eb..79709da 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -92,6 +92,7 @@
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool 
Core Size", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "10", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Number of core threads 
for the worker pool for continue routing Exchange after  [...]
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "20", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Maximum number of threads for the worker pool 
for continue routing Exchange after [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
+    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client 
Factory", "group": "advanced", "label": "advanced", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer 
and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to 
configure a custom factory to c [...]
     "synchronous": { "kind": "property", "displayName": "Synchronous", 
"group": "advanced", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets whether synchronous processing should be 
strictly used" },
     "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry 
URL", "group": "confluent", "label": "confluent", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "URL of the Confluent Platform schema registry 
servers to use. The format is host1:port1,host2:port2. Thi [...]
     "interceptorClasses": { "kind": "property", "displayName": "Interceptor 
Classes", "group": "monitoring", "label": "common,monitoring", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets interceptors for producer or consumers. 
Producer interceptors have to be classes implemen [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 630520a..85c4147 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 98 options, which are listed below.
+The Kafka component supports 99 options, which are listed below.
 
 
 
@@ -118,6 +118,7 @@ The Kafka component supports 98 options, which are listed 
below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool 
for continue routing Exchange after kafka server has acknowledge the message 
that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker 
pool for continue routing Exchange after kafka server has acknowledge the 
message that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used 
for automatic autowiring options (the option must be marked as autowired) by 
looking up in the registry to find if there is a single instance of matching 
type, which then gets configured on the component. This can be used for 
automatic configuring JDBC data sources, JMS connection factories, AWS Clients, 
etc. | true | boolean
+| *kafkaClientFactory* (advanced) | Factory to use for creating 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances. This allows to 
configure a custom factory to create 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances with logic that 
extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be 
strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema 
registry servers to use. The format is host1:port1,host2:port2. This is known 
as schema.registry.url in the Confluent Platform documentation. This option is 
only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or 
consumers. Producer interceptors have to be classes implementing 
org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors 
have to be classes implementing 
org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use 
Producer interceptor on a consumer it will throw a class cast exception in 
runtime |  | String
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
new file mode 100644
index 0000000..ee024bf
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+public class DefaultKafkaClientFactory implements KafkaClientFactory {
+    @Override
+    public KafkaProducer getProducer(Properties kafkaProps) {
+        return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
+    }
+
+    @Override
+    public KafkaConsumer getConsumer(Properties kafkaProps) {
+        return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
new file mode 100644
index 0000000..4ab066f
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} 
instances.
+ */
+public interface KafkaClientFactory {
+
+    /**
+     * Creates a new instance of the {@link KafkaProducer} class.
+     * 
+     * @param  kafkaProps The producer configs.
+     * @return            an instance of Kafka producer.
+     */
+    KafkaProducer getProducer(Properties kafkaProps);
+
+    /**
+     * Creates a new instance of the {@link KafkaConsumer} class.
+     * 
+     * @param  kafkaProps The consumer configs.
+     * @return            an instance of Kafka consumer.
+     */
+    KafkaConsumer getConsumer(Properties kafkaProps);
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c10b1d0..e0c0732 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,6 +35,8 @@ public class KafkaComponent extends DefaultComponent 
implements SSLContextParame
     private boolean useGlobalSslContextParameters;
     @Metadata(label = "consumer,advanced")
     private KafkaManualCommitFactory kafkaManualCommitFactory = new 
DefaultKafkaManualCommitFactory();
+    @Metadata(label = "advanced")
+    private KafkaClientFactory kafkaClientFactory = new 
DefaultKafkaClientFactory();
 
     public KafkaComponent() {
     }
@@ -109,4 +111,22 @@ public class KafkaComponent extends DefaultComponent 
implements SSLContextParame
     public void setKafkaManualCommitFactory(KafkaManualCommitFactory 
kafkaManualCommitFactory) {
         this.kafkaManualCommitFactory = kafkaManualCommitFactory;
     }
+
+    public KafkaClientFactory getKafkaClientFactory() {
+        return kafkaClientFactory;
+    }
+
+    /**
+     * Factory to use for creating {@link 
org.apache.kafka.clients.consumer.KafkaConsumer} and
+     * {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This 
allows to configure a custom factory to
+     * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
+     * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with 
logic that extends the vanilla Kafka
+     * clients.
+     *
+     * @param kafkaClientFactory factory instance to use.
+     */
+    public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) {
+        this.kafkaClientFactory = kafkaClientFactory;
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index fc51f58..86064a0 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -239,7 +239,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                 // this may throw an exception if something is wrong with kafka
                 // consumer
-                this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+                this.consumer = 
endpoint.getComponent().getKafkaClientFactory().getConsumer(kafkaProps);
             } finally {
                 
Thread.currentThread().setContextClassLoader(threadClassLoader);
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 39692d7..2cf9a34 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -112,7 +112,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 Thread.currentThread()
                         
.setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
                 LOG.trace("Creating KafkaProducer");
-                kafkaProducer = new 
org.apache.kafka.clients.producer.KafkaProducer(props);
+                kafkaProducer = 
endpoint.getComponent().getKafkaClientFactory().getProducer(props);
                 closeKafkaProducer = true;
             } finally {
                 
Thread.currentThread().setContextClassLoader(threadClassLoader);
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index ba31c5a..748b63d 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -1371,6 +1371,28 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * Factory to use for creating
+         * org.apache.kafka.clients.consumer.KafkaConsumer and
+         * org.apache.kafka.clients.producer.KafkaProducer instances. This
+         * allows to configure a custom factory to create
+         * org.apache.kafka.clients.consumer.KafkaConsumer and
+         * org.apache.kafka.clients.producer.KafkaProducer instances with logic
+         * that extends the vanilla Kafka clients.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.kafka.KafkaClientFactory&lt;/code&gt; 
type.
+         * 
+         * Group: advanced
+         * 
+         * @param kafkaClientFactory the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder kafkaClientFactory(
+                org.apache.camel.component.kafka.KafkaClientFactory 
kafkaClientFactory) {
+            doSetProperty("kafkaClientFactory", kafkaClientFactory);
+            return this;
+        }
+        /**
          * Sets whether synchronous processing should be strictly used.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
@@ -1944,6 +1966,7 @@ public interface KafkaComponentBuilderFactory {
             case "workerPoolCoreSize": 
getOrCreateConfiguration((KafkaComponent) 
component).setWorkerPoolCoreSize((java.lang.Integer) value); return true;
             case "workerPoolMaxSize": 
getOrCreateConfiguration((KafkaComponent) 
component).setWorkerPoolMaxSize((java.lang.Integer) value); return true;
             case "autowiredEnabled": ((KafkaComponent) 
component).setAutowiredEnabled((boolean) value); return true;
+            case "kafkaClientFactory": ((KafkaComponent) 
component).setKafkaClientFactory((org.apache.camel.component.kafka.KafkaClientFactory)
 value); return true;
             case "synchronous": getOrCreateConfiguration((KafkaComponent) 
component).setSynchronous((boolean) value); return true;
             case "schemaRegistryURL": 
getOrCreateConfiguration((KafkaComponent) 
component).setSchemaRegistryURL((java.lang.String) value); return true;
             case "interceptorClasses": 
getOrCreateConfiguration((KafkaComponent) 
component).setInterceptorClasses((java.lang.String) value); return true;
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc 
b/docs/components/modules/ROOT/pages/kafka-component.adoc
index ea230e4..8c91b52 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -43,7 +43,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 98 options, which are listed below.
+The Kafka component supports 99 options, which are listed below.
 
 
 
@@ -120,6 +120,7 @@ The Kafka component supports 98 options, which are listed 
below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool 
for continue routing Exchange after kafka server has acknowledge the message 
that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker 
pool for continue routing Exchange after kafka server has acknowledge the 
message that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used 
for automatic autowiring options (the option must be marked as autowired) by 
looking up in the registry to find if there is a single instance of matching 
type, which then gets configured on the component. This can be used for 
automatic configuring JDBC data sources, JMS connection factories, AWS Clients, 
etc. | true | boolean
+| *kafkaClientFactory* (advanced) | Factory to use for creating 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances. This allows to 
configure a custom factory to create 
org.apache.kafka.clients.consumer.KafkaConsumer and 
org.apache.kafka.clients.producer.KafkaProducer instances with logic that 
extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be 
strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema 
registry servers to use. The format is host1:port1,host2:port2. This is known 
as schema.registry.url in the Confluent Platform documentation. This option is 
only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or 
consumers. Producer interceptors have to be classes implementing 
org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors 
have to be classes implementing 
org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use 
Producer interceptor on a consumer it will throw a class cast exception in 
runtime |  | String

Reply via email to