This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit a366f69fde4959546d00823e7eea6d3aa0492971 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 9 07:47:26 2021 +0100 CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs --- .../org/apache/camel/catalog/components/kafka.json | 2 +- .../org/apache/camel/catalog/docs/kafka-component.adoc | 2 +- .../camel/component/kafka/KafkaComponentConfigurer.java | 5 +++++ .../resources/org/apache/camel/component/kafka/kafka.json | 2 +- components/camel-kafka/src/main/docs/kafka-component.adoc | 2 +- .../camel/component/kafka/DefaultKafkaClientFactory.java | 12 ++++++++++++ .../apache/camel/component/kafka/KafkaClientFactory.java | 10 ++++++++++ .../org/apache/camel/component/kafka/KafkaComponent.java | 8 ++------ .../org/apache/camel/component/kafka/KafkaConsumer.java | 14 +++----------- .../org/apache/camel/component/kafka/KafkaProducer.java | 7 +++---- .../apache/camel/component/kafka/KafkaConsumerTest.java | 6 +++++- .../component/dsl/KafkaComponentBuilderFactory.java | 4 +--- docs/components/modules/ROOT/pages/kafka-component.adoc | 2 +- 13 files changed, 46 insertions(+), 30 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 79709da..55a1240 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -92,7 +92,7 @@ "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] - "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...] + "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...] "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" }, "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...] "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc index 85c4147..cfaf481 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc @@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean -| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory +| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 1e2481e..e44e7a1 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -225,6 +225,11 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen } @Override + public String[] getAutowiredNames() { + return new String[]{"kafkaClientFactory"}; + } + + @Override public Class<?> getOptionType(String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { case "additionalproperties": diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 79709da..55a1240 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -92,7 +92,7 @@ "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] - "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...] + "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...] "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" }, "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...] "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...] diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 85c4147..cfaf481 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean -| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory +| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java index ee024bf..425788e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java @@ -18,10 +18,12 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import org.apache.camel.util.ObjectHelper; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; public class DefaultKafkaClientFactory implements KafkaClientFactory { + @Override public KafkaProducer getProducer(Properties kafkaProps) { return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps); @@ -31,4 +33,14 @@ public class DefaultKafkaClientFactory implements KafkaClientFactory { public KafkaConsumer getConsumer(Properties kafkaProps) { return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); } + + @Override + public String getBrokers(KafkaConfiguration configuration) { + // broker urls is mandatory in this implementation + String brokers = configuration.getBrokers(); + if (ObjectHelper.isEmpty(brokers)) { + throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option."); + } + return brokers; + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java index 4ab066f..d7de678 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java @@ -41,4 +41,14 @@ public interface KafkaClientFactory { * @return an instance of Kafka consumer. */ KafkaConsumer getConsumer(Properties kafkaProps); + + /** + * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers + * or a VIP pointing to a subset of brokers. + * <p/> + * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation. + * + * @param configuration the configuration + */ + String getBrokers(KafkaConfiguration configuration); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index e0c0732..205f7a6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -35,7 +35,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame private boolean useGlobalSslContextParameters; @Metadata(label = "consumer,advanced") private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory(); - @Metadata(label = "advanced") + @Metadata(autowired = true, label = "advanced") private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory(); public KafkaComponent() { @@ -119,11 +119,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame /** * Factory to use for creating {@link org.apache.kafka.clients.consumer.KafkaConsumer} and * {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This allows to configure a custom factory to - * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and - * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with logic that extends the vanilla Kafka - * clients. - * - * @param kafkaClientFactory factory instance to use. + * create instances with logic that extends the vanilla Kafka clients. */ public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) { this.kafkaClientFactory = kafkaClientFactory; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index eae650e..b911881 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -41,7 +41,6 @@ import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.IOHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -71,11 +70,6 @@ public class KafkaConsumer extends DefaultConsumer { this.endpoint = endpoint; this.processor = processor; this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs(); - - String brokers = endpoint.getConfiguration().getBrokers(); - if (ObjectHelper.isEmpty(brokers)) { - throw new IllegalArgumentException("Brokers must be configured"); - } } @Override @@ -87,13 +81,11 @@ public class KafkaConsumer extends DefaultConsumer { Properties props = endpoint.getConfiguration().createConsumerProperties(); endpoint.updateClassProperties(props); - String brokers = endpoint.getConfiguration().getBrokers(); - if (brokers == null) { - throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option."); + String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration()); + if (brokers != null) { + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); } - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - if (endpoint.getConfiguration().getGroupId() != null) { String groupId = endpoint.getConfiguration().getGroupId(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 2cf9a34..a8ef383 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -70,11 +70,10 @@ public class KafkaProducer extends DefaultAsyncProducer { Properties props = endpoint.getConfiguration().createProducerProperties(); endpoint.updateClassProperties(props); - String brokers = endpoint.getConfiguration().getBrokers(); - if (brokers == null) { - throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option."); + String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration()); + if (brokers != null) { + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); } - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); return props; } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index aa6b6f5..e0a397d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -20,12 +20,14 @@ import org.apache.camel.Processor; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class KafkaConsumerTest { private KafkaConfiguration configuration = mock(KafkaConfiguration.class); + private KafkaClientFactory clientFactory = mock(KafkaClientFactory.class); private KafkaComponent component = mock(KafkaComponent.class); private KafkaEndpoint endpoint = mock(KafkaEndpoint.class); private Processor processor = mock(Processor.class); @@ -35,8 +37,10 @@ public class KafkaConsumerTest { when(endpoint.getComponent()).thenReturn(component); when(endpoint.getConfiguration()).thenReturn(configuration); when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne"); + when(component.getKafkaClientFactory()).thenReturn(clientFactory); + when(clientFactory.getBrokers(any())).thenReturn(null); assertThrows(IllegalArgumentException.class, - () -> new KafkaConsumer(endpoint, processor)); + () -> new KafkaConsumer(endpoint, processor).getProps()); } @Test diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 748b63d..9a8c878 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -1374,9 +1374,7 @@ public interface KafkaComponentBuilderFactory { * Factory to use for creating * org.apache.kafka.clients.consumer.KafkaConsumer and * org.apache.kafka.clients.producer.KafkaProducer instances. This - * allows to configure a custom factory to create - * org.apache.kafka.clients.consumer.KafkaConsumer and - * org.apache.kafka.clients.producer.KafkaProducer instances with logic + * allows to configure a custom factory to create instances with logic * that extends the vanilla Kafka clients. * * The option is a: diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc index 8c91b52..c74f74c 100644 --- a/docs/components/modules/ROOT/pages/kafka-component.adoc +++ b/docs/components/modules/ROOT/pages/kafka-component.adoc @@ -120,7 +120,7 @@ The Kafka component supports 99 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean -| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory +| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String