This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 59103a6 Adds config option 'kafkaClientFactory' to camel-kafka (#4918) 59103a6 is described below commit 59103a66cae683f8d41438d9ace487e5f29a8159 Author: Javier Holguera <javier.holgu...@gmail.com> AuthorDate: Sun Jan 24 17:23:51 2021 +0000 Adds config option 'kafkaClientFactory' to camel-kafka (#4918) --- .../apache/camel/catalog/docs/kafka-component.adoc | 3 +- .../component/kafka/KafkaComponentConfigurer.java | 6 +++ .../org/apache/camel/component/kafka/kafka.json | 1 + .../camel-kafka/src/main/docs/kafka-component.adoc | 3 +- .../component/kafka/DefaultKafkaClientFactory.java | 34 +++++++++++++++++ .../camel/component/kafka/KafkaClientFactory.java | 44 ++++++++++++++++++++++ .../camel/component/kafka/KafkaComponent.java | 20 ++++++++++ .../camel/component/kafka/KafkaConsumer.java | 2 +- .../camel/component/kafka/KafkaProducer.java | 2 +- .../dsl/KafkaComponentBuilderFactory.java | 23 +++++++++++ .../modules/ROOT/pages/kafka-component.adoc | 3 +- 11 files changed, 136 insertions(+), 5 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc index 630520a..85c4147 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc @@ -41,7 +41,7 @@ kafka:topic[?options] // component options: START -The Kafka component supports 98 options, which are listed below. +The Kafka component supports 99 options, which are listed below. @@ -118,6 +118,7 @@ The Kafka component supports 98 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean +| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 6ad0aef..1e2481e 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -84,6 +84,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": getOrCreateConfiguration(target).setHeartbeatIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true; case "interceptorclasses": case "interceptorClasses": getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true; + case "kafkaclientfactory": + case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true; case "kafkamanualcommitfactory": case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true; case "kerberosbeforereloginmintime": @@ -281,6 +283,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": return java.lang.Integer.class; case "interceptorclasses": case "interceptorClasses": return java.lang.String.class; + case "kafkaclientfactory": + case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class; case "kafkamanualcommitfactory": case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class; case "kerberosbeforereloginmintime": @@ -479,6 +483,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": return getOrCreateConfiguration(target).getHeartbeatIntervalMs(); case "interceptorclasses": case "interceptorClasses": return getOrCreateConfiguration(target).getInterceptorClasses(); + case "kafkaclientfactory": + case "kafkaClientFactory": return target.getKafkaClientFactory(); case "kafkamanualcommitfactory": case "kafkaManualCommitFactory": return target.getKafkaManualCommitFactory(); case "kerberosbeforereloginmintime": diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 4c3d0eb..79709da 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -92,6 +92,7 @@ "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] + "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...] "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" }, "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...] "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...] diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 630520a..85c4147 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -41,7 +41,7 @@ kafka:topic[?options] // component options: START -The Kafka component supports 98 options, which are listed below. +The Kafka component supports 99 options, which are listed below. @@ -118,6 +118,7 @@ The Kafka component supports 98 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean +| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java new file mode 100644 index 0000000..ee024bf --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +public class DefaultKafkaClientFactory implements KafkaClientFactory { + @Override + public KafkaProducer getProducer(Properties kafkaProps) { + return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps); + } + + @Override + public KafkaConsumer getConsumer(Properties kafkaProps) { + return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java new file mode 100644 index 0000000..4ab066f --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +/** + * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} instances. + */ +public interface KafkaClientFactory { + + /** + * Creates a new instance of the {@link KafkaProducer} class. + * + * @param kafkaProps The producer configs. + * @return an instance of Kafka producer. + */ + KafkaProducer getProducer(Properties kafkaProps); + + /** + * Creates a new instance of the {@link KafkaConsumer} class. + * + * @param kafkaProps The consumer configs. + * @return an instance of Kafka consumer. + */ + KafkaConsumer getConsumer(Properties kafkaProps); +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index c10b1d0..e0c0732 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -35,6 +35,8 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame private boolean useGlobalSslContextParameters; @Metadata(label = "consumer,advanced") private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory(); + @Metadata(label = "advanced") + private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory(); public KafkaComponent() { } @@ -109,4 +111,22 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame public void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory) { this.kafkaManualCommitFactory = kafkaManualCommitFactory; } + + public KafkaClientFactory getKafkaClientFactory() { + return kafkaClientFactory; + } + + /** + * Factory to use for creating {@link org.apache.kafka.clients.consumer.KafkaConsumer} and + * {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This allows to configure a custom factory to + * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and + * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with logic that extends the vanilla Kafka + * clients. + * + * @param kafkaClientFactory factory instance to use. + */ + public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) { + this.kafkaClientFactory = kafkaClientFactory; + } + } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index fc51f58..86064a0 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -239,7 +239,7 @@ public class KafkaConsumer extends DefaultConsumer { .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); // this may throw an exception if something is wrong with kafka // consumer - this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + this.consumer = endpoint.getComponent().getKafkaClientFactory().getConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 39692d7..2cf9a34 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -112,7 +112,7 @@ public class KafkaProducer extends DefaultAsyncProducer { Thread.currentThread() .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); LOG.trace("Creating KafkaProducer"); - kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props); + kafkaProducer = endpoint.getComponent().getKafkaClientFactory().getProducer(props); closeKafkaProducer = true; } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index ba31c5a..748b63d 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -1371,6 +1371,28 @@ public interface KafkaComponentBuilderFactory { return this; } /** + * Factory to use for creating + * org.apache.kafka.clients.consumer.KafkaConsumer and + * org.apache.kafka.clients.producer.KafkaProducer instances. This + * allows to configure a custom factory to create + * org.apache.kafka.clients.consumer.KafkaConsumer and + * org.apache.kafka.clients.producer.KafkaProducer instances with logic + * that extends the vanilla Kafka clients. + * + * The option is a: + * <code>org.apache.camel.component.kafka.KafkaClientFactory</code> type. + * + * Group: advanced + * + * @param kafkaClientFactory the value to set + * @return the dsl builder + */ + default KafkaComponentBuilder kafkaClientFactory( + org.apache.camel.component.kafka.KafkaClientFactory kafkaClientFactory) { + doSetProperty("kafkaClientFactory", kafkaClientFactory); + return this; + } + /** * Sets whether synchronous processing should be strictly used. * * The option is a: <code>boolean</code> type. @@ -1944,6 +1966,7 @@ public interface KafkaComponentBuilderFactory { case "workerPoolCoreSize": getOrCreateConfiguration((KafkaComponent) component).setWorkerPoolCoreSize((java.lang.Integer) value); return true; case "workerPoolMaxSize": getOrCreateConfiguration((KafkaComponent) component).setWorkerPoolMaxSize((java.lang.Integer) value); return true; case "autowiredEnabled": ((KafkaComponent) component).setAutowiredEnabled((boolean) value); return true; + case "kafkaClientFactory": ((KafkaComponent) component).setKafkaClientFactory((org.apache.camel.component.kafka.KafkaClientFactory) value); return true; case "synchronous": getOrCreateConfiguration((KafkaComponent) component).setSynchronous((boolean) value); return true; case "schemaRegistryURL": getOrCreateConfiguration((KafkaComponent) component).setSchemaRegistryURL((java.lang.String) value); return true; case "interceptorClasses": getOrCreateConfiguration((KafkaComponent) component).setInterceptorClasses((java.lang.String) value); return true; diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc index ea230e4..8c91b52 100644 --- a/docs/components/modules/ROOT/pages/kafka-component.adoc +++ b/docs/components/modules/ROOT/pages/kafka-component.adoc @@ -43,7 +43,7 @@ kafka:topic[?options] // component options: START -The Kafka component supports 98 options, which are listed below. +The Kafka component supports 99 options, which are listed below. @@ -120,6 +120,7 @@ The Kafka component supports 98 options, which are listed below. | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean +| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String