This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 333e6f1 CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs 333e6f1 is described below commit 333e6f13a744581b624091278ac75161716e0d43 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 9 09:30:12 2021 +0100 CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs --- .../camel/catalog/components/vertx-kafka.json | 3 +- .../camel/catalog/docs/vertx-kafka-component.adoc | 5 +- .../vertx/kafka/VertxKafkaComponentConfigurer.java | 6 +- .../vertx/kafka/VertxKafkaEndpointConfigurer.java | 6 - .../vertx/kafka/VertxKafkaEndpointUriFactory.java | 3 +- .../camel/component/vertx/kafka/vertx-kafka.json | 3 +- .../src/main/docs/vertx-kafka-component.adoc | 5 +- .../kafka/DefaultVertxKafkaClientFactory.java | 14 +++ .../vertx/kafka/VertxKafkaClientFactory.java | 29 ++++- .../component/vertx/kafka/VertxKafkaComponent.java | 23 ++-- .../component/vertx/kafka/VertxKafkaConsumer.java | 7 +- .../component/vertx/kafka/VertxKafkaProducer.java | 11 +- .../configuration/BaseVertxKafkaConfiguration.java | 19 ---- .../dsl/VertxKafkaComponentBuilderFactory.java | 2 +- .../dsl/VertxKafkaEndpointBuilderFactory.java | 126 --------------------- .../modules/ROOT/pages/vertx-kafka-component.adoc | 5 +- 16 files changed, 86 insertions(+), 181 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-kafka.json index d08bb98..47349b0 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-kafka.json @@ -89,7 +89,7 @@ "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Serializer c [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "vertx": { "kind": "property", "displayName": "Vertx", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.vertx.core.Vertx", "deprecated": false, "autowired": true, "secret": false, "description": "To use an existing vertx instead of creating a new instance" }, - "vertxKafkaClientFactory": { "kind": "property", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Factory to use for cre [...] + "vertxKafkaClientFactory": { "kind": "property", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a cust [...] "vertxOptions": { "kind": "property", "displayName": "Vertx Options", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.vertx.core.VertxOptions", "deprecated": false, "autowired": false, "secret": false, "description": "To provide a custom set of vertx options for configuring vertx" }, "saslClientCallbackHandlerClass": { "kind": "property", "displayName": "Sasl Client Callback Handler Class", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The fully qualified name of a SASL client call [...] "saslJaasConfig": { "kind": "property", "displayName": "Sasl Jaas Config", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "JAAS login context parameters for SASL connections in the format used by JAAS co [...] @@ -195,7 +195,6 @@ "transactionalId": { "kind": "parameter", "displayName": "Transactional Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The TransactionalId to use for transactional delivery. This enables reliability seman [...] "transactionTimeoutMs": { "kind": "parameter", "displayName": "Transaction Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1m", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time in ms that the transaction coordinat [...] "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Serializer [...] - "vertxKafkaClientFactory": { "kind": "parameter", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Factory to use for cr [...] "saslClientCallbackHandlerClass": { "kind": "parameter", "displayName": "Sasl Client Callback Handler Class", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The fully qualified name of a SASL client cal [...] "saslJaasConfig": { "kind": "parameter", "displayName": "Sasl Jaas Config", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "JAAS login context parameters for SASL connections in the format used by JAAS c [...] "saslKerberosKinitCmd": { "kind": "parameter", "displayName": "Sasl Kerberos Kinit Cmd", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "\/usr\/bin\/kinit", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Kerberos kinit command path." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc index a02900a..920b545 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc @@ -74,7 +74,7 @@ with the following path and query parameters: |=== -=== Query Parameters (102 parameters): +=== Query Parameters (101 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -146,7 +146,6 @@ with the following path and query parameters: | *transactionalId* (producer) | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. If a TransactionalId is configured, enable.idempotence is implied. By default the TransactionI [...] | *transactionTimeoutMs* (producer) | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error. | 1m | int | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String | *saslKerberosKinitCmd* (security) | Kerberos kinit command path. | /usr/bin/kinit | String @@ -261,7 +260,7 @@ The Vert.x Kafka component supports 104 options, which are listed below. | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *vertx* (advanced) | *Autowired* To use an existing vertx instead of creating a new instance | | Vertx -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory +| *vertxKafkaClientFactory* (advanced) | *Autowired* Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *vertxOptions* (advanced) | To provide a custom set of vertx options for configuring vertx | | VertxOptions | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java index 4abc928..100966b 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java @@ -229,7 +229,7 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp case "valueSerializer": getOrCreateConfiguration(target).setValueSerializer(property(camelContext, java.lang.String.class, value)); return true; case "vertx": target.setVertx(property(camelContext, io.vertx.core.Vertx.class, value)); return true; case "vertxkafkaclientfactory": - case "vertxKafkaClientFactory": getOrCreateConfiguration(target).setVertxKafkaClientFactory(property(camelContext, org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory.class, value)); return true; + case "vertxKafkaClientFactory": target.setVertxKafkaClientFactory(property(camelContext, org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory.class, value)); return true; case "vertxoptions": case "vertxOptions": target.setVertxOptions(property(camelContext, io.vertx.core.VertxOptions.class, value)); return true; default: return false; @@ -238,7 +238,7 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp @Override public String[] getAutowiredNames() { - return new String[]{"vertx"}; + return new String[]{"vertx","vertxKafkaClientFactory"}; } @Override @@ -657,7 +657,7 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp case "valueSerializer": return getOrCreateConfiguration(target).getValueSerializer(); case "vertx": return target.getVertx(); case "vertxkafkaclientfactory": - case "vertxKafkaClientFactory": return getOrCreateConfiguration(target).getVertxKafkaClientFactory(); + case "vertxKafkaClientFactory": return target.getVertxKafkaClientFactory(); case "vertxoptions": case "vertxOptions": return target.getVertxOptions(); default: return null; diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java index 8811243..3b9e546 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java @@ -221,8 +221,6 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl case "valueDeserializer": target.getConfiguration().setValueDeserializer(property(camelContext, java.lang.String.class, value)); return true; case "valueserializer": case "valueSerializer": target.getConfiguration().setValueSerializer(property(camelContext, java.lang.String.class, value)); return true; - case "vertxkafkaclientfactory": - case "vertxKafkaClientFactory": target.getConfiguration().setVertxKafkaClientFactory(property(camelContext, org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory.class, value)); return true; default: return false; } } @@ -430,8 +428,6 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl case "valueDeserializer": return java.lang.String.class; case "valueserializer": case "valueSerializer": return java.lang.String.class; - case "vertxkafkaclientfactory": - case "vertxKafkaClientFactory": return org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory.class; default: return null; } } @@ -640,8 +636,6 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl case "valueDeserializer": return target.getConfiguration().getValueDeserializer(); case "valueserializer": case "valueSerializer": return target.getConfiguration().getValueSerializer(); - case "vertxkafkaclientfactory": - case "vertxKafkaClientFactory": return target.getConfiguration().getVertxKafkaClientFactory(); default: return null; } } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java index 4a7c482..ebf346d 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java @@ -20,7 +20,7 @@ public class VertxKafkaEndpointUriFactory extends org.apache.camel.support.compo private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(103); + Set<String> props = new HashSet<>(102); props.add("receiveBufferBytes"); props.add("saslLoginRefreshWindowFactor"); props.add("socketConnectionSetupTimeoutMs"); @@ -71,7 +71,6 @@ public class VertxKafkaEndpointUriFactory extends org.apache.camel.support.compo props.add("seekToPosition"); props.add("saslKerberosMinTimeBeforeRelogin"); props.add("sslKeystoreCertificateChain"); - props.add("vertxKafkaClientFactory"); props.add("maxPollIntervalMs"); props.add("reconnectBackoffMs"); props.add("groupId"); diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json index d08bb98..47349b0 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json @@ -89,7 +89,7 @@ "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Serializer c [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "vertx": { "kind": "property", "displayName": "Vertx", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.vertx.core.Vertx", "deprecated": false, "autowired": true, "secret": false, "description": "To use an existing vertx instead of creating a new instance" }, - "vertxKafkaClientFactory": { "kind": "property", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Factory to use for cre [...] + "vertxKafkaClientFactory": { "kind": "property", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a cust [...] "vertxOptions": { "kind": "property", "displayName": "Vertx Options", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.vertx.core.VertxOptions", "deprecated": false, "autowired": false, "secret": false, "description": "To provide a custom set of vertx options for configuring vertx" }, "saslClientCallbackHandlerClass": { "kind": "property", "displayName": "Sasl Client Callback Handler Class", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The fully qualified name of a SASL client call [...] "saslJaasConfig": { "kind": "property", "displayName": "Sasl Jaas Config", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "JAAS login context parameters for SASL connections in the format used by JAAS co [...] @@ -195,7 +195,6 @@ "transactionalId": { "kind": "parameter", "displayName": "Transactional Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The TransactionalId to use for transactional delivery. This enables reliability seman [...] "transactionTimeoutMs": { "kind": "parameter", "displayName": "Transaction Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1m", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time in ms that the transaction coordinat [...] "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Serializer [...] - "vertxKafkaClientFactory": { "kind": "parameter", "displayName": "Vertx Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Factory to use for cr [...] "saslClientCallbackHandlerClass": { "kind": "parameter", "displayName": "Sasl Client Callback Handler Class", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The fully qualified name of a SASL client cal [...] "saslJaasConfig": { "kind": "parameter", "displayName": "Sasl Jaas Config", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "JAAS login context parameters for SASL connections in the format used by JAAS c [...] "saslKerberosKinitCmd": { "kind": "parameter", "displayName": "Sasl Kerberos Kinit Cmd", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "\/usr\/bin\/kinit", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Kerberos kinit command path." }, diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc index a02900a..920b545 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc @@ -74,7 +74,7 @@ with the following path and query parameters: |=== -=== Query Parameters (102 parameters): +=== Query Parameters (101 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -146,7 +146,6 @@ with the following path and query parameters: | *transactionalId* (producer) | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. If a TransactionalId is configured, enable.idempotence is implied. By default the TransactionI [...] | *transactionTimeoutMs* (producer) | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error. | 1m | int | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String | *saslKerberosKinitCmd* (security) | Kerberos kinit command path. | /usr/bin/kinit | String @@ -261,7 +260,7 @@ The Vert.x Kafka component supports 104 options, which are listed below. | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *vertx* (advanced) | *Autowired* To use an existing vertx instead of creating a new instance | | Vertx -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory +| *vertxKafkaClientFactory* (advanced) | *Autowired* Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *vertxOptions* (advanced) | To provide a custom set of vertx options for configuring vertx | | VertxOptions | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/DefaultVertxKafkaClientFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/DefaultVertxKafkaClientFactory.java index 7bf8b8f..a95d6a0 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/DefaultVertxKafkaClientFactory.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/DefaultVertxKafkaClientFactory.java @@ -21,6 +21,8 @@ import java.util.Properties; import io.vertx.core.Vertx; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.producer.KafkaProducer; +import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration; +import org.apache.camel.util.ObjectHelper; /** * Default implementation for {@Link VertxKafkaClientFactory} interface. @@ -28,6 +30,7 @@ import io.vertx.kafka.client.producer.KafkaProducer; * Creates default VertX {@Link KafkaConsumer} and {@Link KafkaProducer} instances. */ public class DefaultVertxKafkaClientFactory implements VertxKafkaClientFactory { + @Override public <K, V> KafkaConsumer<K, V> getVertxKafkaConsumer(Vertx vertx, Properties config) { return KafkaConsumer.create(vertx, config); @@ -37,4 +40,15 @@ public class DefaultVertxKafkaClientFactory implements VertxKafkaClientFactory { public <K, V> KafkaProducer<K, V> getVertxKafkaProducer(Vertx vertx, Properties config) { return KafkaProducer.create(vertx, config); } + + @Override + public String getBootstrapBrokers(VertxKafkaConfiguration configuration) { + // broker urls is mandatory in this implementation + String brokers = configuration.getBootstrapServers(); + if (ObjectHelper.isEmpty(brokers)) { + throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the BootstrapServers option."); + } + return brokers; + } + } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaClientFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaClientFactory.java index 98b2693..5be7360 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaClientFactory.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaClientFactory.java @@ -21,13 +21,40 @@ import java.util.Properties; import io.vertx.core.Vertx; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.producer.KafkaProducer; +import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration; /** - * Defines the factory that this component uses to create {@Link KafkaConsumer} and {@Link KafkaProducer} instances. + * Defines the factory that this component uses to create vertx based {@Link KafkaConsumer} and {@Link KafkaProducer} + * instances. */ public interface VertxKafkaClientFactory { + /** + * Creates a new instance of the {@link org.apache.kafka.clients.consumer.KafkaConsumer} class. + * + * @param vertx vertx instance + * @param config The consumer configs. + * @return an instance of Kafka consumer. + */ <K, V> KafkaConsumer<K, V> getVertxKafkaConsumer(Vertx vertx, Properties config); + /** + * Creates a new instance of the {@link org.apache.kafka.clients.producer.KafkaProducer} class. + * + * @param vertx vertx instance + * @param config The producer configs. + * @return an instance of Kafka producer. + */ <K, V> KafkaProducer<K, V> getVertxKafkaProducer(Vertx vertx, Properties config); + + /** + * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers + * or a VIP pointing to a subset of brokers. + * <p/> + * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation. + * + * @param configuration the configuration + */ + String getBootstrapBrokers(VertxKafkaConfiguration configuration); + } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java index 1f3e4a9..1616300 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java @@ -40,6 +40,8 @@ public class VertxKafkaComponent extends DefaultComponent { private Vertx vertx; @Metadata(label = "advanced") private VertxOptions vertxOptions; + @Metadata(label = "advanced", autowired = true) + private VertxKafkaClientFactory vertxKafkaClientFactory = new DefaultVertxKafkaClientFactory(); public VertxKafkaComponent() { } @@ -50,14 +52,12 @@ public class VertxKafkaComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - if (ObjectHelper.isEmpty(remaining)) { throw new IllegalArgumentException("Topic must be configured on endpoint using syntax kafka:topic"); } final VertxKafkaConfiguration configuration = this.configuration != null ? this.configuration.copy() : new VertxKafkaConfiguration(); - configuration.setTopic(remaining); final VertxKafkaEndpoint endpoint = new VertxKafkaEndpoint(uri, this, configuration); @@ -73,8 +73,6 @@ public class VertxKafkaComponent extends DefaultComponent { setProperties(endpoint, parameters); - validateConfigurations(configuration); - return endpoint; } @@ -96,6 +94,7 @@ public class VertxKafkaComponent extends DefaultComponent { protected void doStop() throws Exception { if (managedVertx && vertx != null) { vertx.close(); + vertx = null; } super.doStop(); @@ -134,9 +133,17 @@ public class VertxKafkaComponent extends DefaultComponent { this.vertxOptions = vertxOptions; } - private void validateConfigurations(final VertxKafkaConfiguration configuration) { - if (ObjectHelper.isEmpty(configuration.getBootstrapServers())) { - throw new IllegalArgumentException("Kafka bootstrap servers must be configured in 'bootstrapServers' option."); - } + public VertxKafkaClientFactory getVertxKafkaClientFactory() { + return vertxKafkaClientFactory; } + + /** + * Factory to use for creating {@Link io.vertx.kafka.client.consumer.KafkaConsumer} and + * {@Link io.vertx.kafka.client.consumer.KafkaProducer} instances. This allows to configure a custom factory to + * create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. + */ + public void setVertxKafkaClientFactory(VertxKafkaClientFactory vertxKafkaClientFactory) { + this.vertxKafkaClientFactory = vertxKafkaClientFactory; + } + } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java index 4b8dfe4..a857e44 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java @@ -46,8 +46,13 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { protected void doStart() throws Exception { super.doStart(); + String brokers = getEndpoint().getComponent().getVertxKafkaClientFactory().getBootstrapBrokers(getConfiguration()); + if (brokers != null) { + LOG.debug("Creating KafkaConsumer connecting to BootstrapBrokers: {}", brokers); + } + // create the consumer client - kafkaConsumer = getConfiguration().getVertxKafkaClientFactory() + kafkaConsumer = getEndpoint().getComponent().getVertxKafkaClientFactory() .getVertxKafkaConsumer(getEndpoint().getVertx(), getConfiguration().createConsumerConfiguration()); // create the consumer operation diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaProducer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaProducer.java index 7f1b9ba..a868d2e 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaProducer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaProducer.java @@ -22,9 +22,13 @@ import org.apache.camel.Exchange; import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration; import org.apache.camel.component.vertx.kafka.operations.VertxKafkaProducerOperations; import org.apache.camel.support.DefaultAsyncProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class VertxKafkaProducer extends DefaultAsyncProducer { + private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaProducer.class); + private KafkaProducer<Object, Object> kafkaProducer; private VertxKafkaProducerOperations producerOperations; @@ -34,8 +38,13 @@ public class VertxKafkaProducer extends DefaultAsyncProducer { @Override protected void doStart() { + String brokers = getEndpoint().getComponent().getVertxKafkaClientFactory().getBootstrapBrokers(getConfiguration()); + if (brokers != null) { + LOG.debug("Creating KafkaConsumer connecting to BootstrapBrokers: {}", brokers); + } + // create kafka client - kafkaProducer = getConfiguration().getVertxKafkaClientFactory() + kafkaProducer = getEndpoint().getComponent().getVertxKafkaClientFactory() .getVertxKafkaProducer(getEndpoint().getVertx(), getConfiguration().createProducerConfiguration()); // create our operations diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java index 5c6d16e..90ca519 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.vertx.kafka.configuration; -import org.apache.camel.component.vertx.kafka.DefaultVertxKafkaClientFactory; -import org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory; import org.apache.camel.component.vertx.kafka.VertxKafkaHeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -28,9 +26,6 @@ public abstract class BaseVertxKafkaConfiguration implements HeaderFilterStrateg @UriParam(label = "common") private HeaderFilterStrategy headerFilterStrategy = new VertxKafkaHeaderFilterStrategy(); - @UriParam(label = "advanced") - private VertxKafkaClientFactory vertxKafkaClientFactory = new DefaultVertxKafkaClientFactory(); - /** * To use a custom HeaderFilterStrategy to filter header to and from Camel message. */ @@ -42,18 +37,4 @@ public abstract class BaseVertxKafkaConfiguration implements HeaderFilterStrateg this.headerFilterStrategy = headerFilterStrategy; } - public VertxKafkaClientFactory getVertxKafkaClientFactory() { - return vertxKafkaClientFactory; - } - - /** - * Factory to use for creating {@Link io.vertx.kafka.client.consumer.KafkaConsumer} and - * {@Link io.vertx.kafka.client.consumer.KafkaProducer} instances. This allows to configure a custom factory to - * create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. - * - * @param vertxKafkaClientFactory factory instance to use. - */ - public void setVertxKafkaClientFactory(VertxKafkaClientFactory vertxKafkaClientFactory) { - this.vertxKafkaClientFactory = vertxKafkaClientFactory; - } } diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java index 902b338..4b66285 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java @@ -2167,7 +2167,7 @@ public interface VertxKafkaComponentBuilderFactory { case "valueSerializer": getOrCreateConfiguration((VertxKafkaComponent) component).setValueSerializer((java.lang.String) value); return true; case "autowiredEnabled": ((VertxKafkaComponent) component).setAutowiredEnabled((boolean) value); return true; case "vertx": ((VertxKafkaComponent) component).setVertx((io.vertx.core.Vertx) value); return true; - case "vertxKafkaClientFactory": getOrCreateConfiguration((VertxKafkaComponent) component).setVertxKafkaClientFactory((org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory) value); return true; + case "vertxKafkaClientFactory": ((VertxKafkaComponent) component).setVertxKafkaClientFactory((org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory) value); return true; case "vertxOptions": ((VertxKafkaComponent) component).setVertxOptions((io.vertx.core.VertxOptions) value); return true; case "saslClientCallbackHandlerClass": getOrCreateConfiguration((VertxKafkaComponent) component).setSaslClientCallbackHandlerClass((java.lang.String) value); return true; case "saslJaasConfig": getOrCreateConfiguration((VertxKafkaComponent) component).setSaslJaasConfig((java.lang.String) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java index 0800d71..c634c82 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java @@ -2405,48 +2405,6 @@ public interface VertxKafkaEndpointBuilderFactory { doSetProperty("exchangePattern", exchangePattern); return this; } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option is a: - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointConsumerBuilder vertxKafkaClientFactory( - Object vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option will be converted to a - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointConsumerBuilder vertxKafkaClientFactory( - String vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } } /** @@ -4619,48 +4577,6 @@ public interface VertxKafkaEndpointBuilderFactory { default VertxKafkaEndpointProducerBuilder basic() { return (VertxKafkaEndpointProducerBuilder) this; } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option is a: - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointProducerBuilder vertxKafkaClientFactory( - Object vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option will be converted to a - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointProducerBuilder vertxKafkaClientFactory( - String vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } } /** @@ -6118,48 +6034,6 @@ public interface VertxKafkaEndpointBuilderFactory { default VertxKafkaEndpointBuilder basic() { return (VertxKafkaEndpointBuilder) this; } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option is a: - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointBuilder vertxKafkaClientFactory( - Object vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } - /** - * Factory to use for creating - * io.vertx.kafka.client.consumer.KafkaConsumer and - * io.vertx.kafka.client.consumer.KafkaProducer instances. This allows - * to configure a custom factory to create custom KafkaConsumer and - * KafkaProducer instances with logic that extends the vanilla VertX - * Kafka clients. - * - * The option will be converted to a - * <code>org.apache.camel.component.vertx.kafka.VertxKafkaClientFactory</code> type. - * - * Group: advanced - * - * @param vertxKafkaClientFactory the value to set - * @return the dsl builder - */ - default AdvancedVertxKafkaEndpointBuilder vertxKafkaClientFactory( - String vertxKafkaClientFactory) { - doSetProperty("vertxKafkaClientFactory", vertxKafkaClientFactory); - return this; - } } public interface VertxKafkaBuilders { diff --git a/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc b/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc index b3452ce..d8b357c 100644 --- a/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc +++ b/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc @@ -76,7 +76,7 @@ with the following path and query parameters: |=== -=== Query Parameters (102 parameters): +=== Query Parameters (101 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -148,7 +148,6 @@ with the following path and query parameters: | *transactionalId* (producer) | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. If a TransactionalId is configured, enable.idempotence is implied. By default the TransactionI [...] | *transactionTimeoutMs* (producer) | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTxnTimeoutException error. | 1m | int | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String | *saslKerberosKinitCmd* (security) | Kerberos kinit command path. | /usr/bin/kinit | String @@ -263,7 +262,7 @@ The Vert.x Kafka component supports 104 options, which are listed below. | *valueSerializer* (producer) | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. | org.apache.kafka.common.serialization.StringSerializer | String | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean | *vertx* (advanced) | *Autowired* To use an existing vertx instead of creating a new instance | | Vertx -| *vertxKafkaClientFactory* (advanced) | Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory +| *vertxKafkaClientFactory* (advanced) | *Autowired* Factory to use for creating io.vertx.kafka.client.consumer.KafkaConsumer and io.vertx.kafka.client.consumer.KafkaProducer instances. This allows to configure a custom factory to create custom KafkaConsumer and KafkaProducer instances with logic that extends the vanilla VertX Kafka clients. | | VertxKafkaClientFactory | *vertxOptions* (advanced) | To provide a custom set of vertx options for configuring vertx | | VertxOptions | *saslClientCallbackHandlerClass* (security) | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. | | String | *saslJaasConfig* (security) | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: 'loginModuleClass controlFlag (optionName=optionValue);'. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; | | String