Repository: camel Updated Branches: refs/heads/camel-2.17.x 1d3e4a2d6 -> fe35f0034
[CAMEL-10087, CAMEL-10069] Port 10069 to consumer (as much as can be), set consumer TCCL to null since not all can be worked around. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe35f003 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe35f003 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe35f003 Branch: refs/heads/camel-2.17.x Commit: fe35f0034518c99163273cf05e1cfdaf213ba19f Parents: 1d3e4a2 Author: Daniel Kulp <dk...@apache.org> Authored: Sat Jun 25 08:50:29 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Sat Jun 25 09:17:40 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 10 ++- .../camel/component/kafka/KafkaEndpoint.java | 64 +++++++++++++++++++- .../camel/component/kafka/KafkaProducer.java | 57 +---------------- 3 files changed, 73 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fe35f003/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 04b60cc..daae07a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -53,6 +53,7 @@ public class KafkaConsumer extends DefaultConsumer { Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); + endpoint.updateClassProperties(props); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId()); return props; @@ -94,7 +95,14 @@ public class KafkaConsumer extends DefaultConsumer { this.topicName = topicName; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; - this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + + ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(null); + this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + } finally { + Thread.currentThread().setContextClassLoader(threadClassLoader); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/fe35f003/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 7303c44..4116800 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.lang.reflect.Field; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -27,16 +28,24 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.SynchronousDelegateProducer; +import org.apache.camel.spi.ClassResolver; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.util.CastUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers. */ @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass = KafkaConsumer.class, label = "messaging") public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { - + private static final Logger LOG = LoggerFactory.getLogger(KafkaEndpoint.class); + @UriParam private KafkaConfiguration configuration = new KafkaConfiguration(); @UriParam @@ -91,6 +100,59 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return true; } + + private void loadParitionerClass(ClassResolver resolver, Properties props) { + replaceWithClass(props, "partitioner.class", resolver, Partitioner.class); + } + <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) { + if (o == null || o instanceof Class) { + return CastUtils.cast((Class<?>)o); + } + String name = o.toString(); + Class<T> c = resolver.resolveClass(name, type); + if (c == null) { + c = resolver.resolveClass(name, type, getClass().getClassLoader()); + } + if (c == null) { + c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); + } + return c; + } + void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) { + Class<?> c = loadClass(props.get(key), resolver, type); + if (c != null) { + props.put(key, c); + } + } + + public void updateClassProperties(Properties props) { + try { + if (getCamelContext() != null) { + ClassResolver resolver = getCamelContext().getClassResolver(); + replaceWithClass(props, "key.serializer", resolver, Serializer.class); + replaceWithClass(props, "value.serializer", resolver, Serializer.class); + + try { + //doesn't exist in old version of Kafka client so detect and only call the method if + //the field/config actually exists + Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG"); + if (f != null) { + loadParitionerClass(resolver, props); + } + } catch (NoSuchFieldException e) { + //ignore + } catch (SecurityException e) { + //ignore + } + //doesn't work as it needs to be List<String> :( + //replaceWithClass(props, "partition.assignment.strategy", resolver, PartitionAssignor.class); + } + } catch (Throwable t) { + //can ignore and Kafka itself might be able to handle it, if not, it will throw an exception + LOG.debug("Problem loading classes for Serializers", t); + } + } + public ExecutorService createExecutor() { return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams()); } http://git-wip-us.apache.org/repos/asf/camel/blob/fe35f003/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index a81ec15..b1a496c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.kafka; -import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -31,19 +30,12 @@ import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.spi.ClassResolver; -import org.apache.camel.util.CastUtils; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KafkaProducer extends DefaultAsyncProducer { - private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; @@ -54,63 +46,16 @@ public class KafkaProducer extends DefaultAsyncProducer { super(endpoint); this.endpoint = endpoint; } - - <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) { - if (o == null || o instanceof Class) { - return CastUtils.cast((Class<?>)o); - } - String name = o.toString(); - Class<T> c = resolver.resolveClass(name, type); - if (c == null) { - c = resolver.resolveClass(name, type, getClass().getClassLoader()); - } - if (c == null) { - c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); - } - return c; - } - void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) { - Class<?> c = loadClass(props.get(key), resolver, type); - if (c != null) { - props.put(key, c); - } - } Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); - try { - if (endpoint.getCamelContext() != null) { - ClassResolver resolver = endpoint.getCamelContext().getClassResolver(); - replaceWithClass(props, "key.serializer", resolver, Serializer.class); - replaceWithClass(props, "value.serializer", resolver, Serializer.class); - - try { - //doesn't exist in old version of Kafka client so detect and only call the method if - //the field/config actually exists - Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG"); - if (f != null) { - loadParitionerClass(resolver, props); - } - } catch (NoSuchFieldException e) { - //ignore - } catch (SecurityException e) { - //ignore - } - } - } catch (Throwable t) { - //can ignore and Kafka itself might be able to handle it, if not, it will throw an exception - LOG.debug("Problem loading classes for Serializers", t); - } + endpoint.updateClassProperties(props); if (endpoint.getBrokers() != null) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); } return props; } - private void loadParitionerClass(ClassResolver resolver, Properties props) { - replaceWithClass(props, "partitioner.class", resolver, Partitioner.class); - } - public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { return kafkaProducer;