Repository: camel Updated Branches: refs/heads/camel-2.17.x 164512979 -> 38c75c5cc
[CAMEL-10069] Update to use ClassResolver to help search for the partitioner and serializers Also pull search for Partitioner out into separate try block to allow for use with 0.8 kafka client (which doesn't have partitioner) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38c75c5c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38c75c5c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38c75c5c Branch: refs/heads/camel-2.17.x Commit: 38c75c5cc2ac127898aa19268901df7360d5631c Parents: 1645129 Author: Daniel Kulp <dk...@apache.org> Authored: Wed Jun 22 15:30:14 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Wed Jun 22 15:49:30 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 2 +- .../camel/component/kafka/KafkaProducer.java | 70 ++++++++++++-------- .../component/kafka/KafkaProducerFullTest.java | 2 +- 3 files changed, 44 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index d4cfe49..04b60cc 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -90,7 +90,7 @@ public class KafkaConsumer extends DefaultConsumer { private final String threadId; private final Properties kafkaProps; - public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { + KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { this.topicName = topicName; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index f3a2fb9..a81ec15 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -30,15 +31,20 @@ import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.spi.ClassResolver; +import org.apache.camel.util.CastUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaProducer extends DefaultAsyncProducer { - + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); + private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; private ExecutorService workerPool; @@ -49,36 +55,22 @@ public class KafkaProducer extends DefaultAsyncProducer { this.endpoint = endpoint; } - - Class<?> loadClass(Object o, ClassLoader loader) { + <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) { if (o == null || o instanceof Class) { - return (Class<?>)o; + return CastUtils.cast((Class<?>)o); } String name = o.toString(); - Class<?> c; - try { - c = Class.forName(name, true, loader); - } catch (ClassNotFoundException e) { - c = null; - } + Class<T> c = resolver.resolveClass(name, type); if (c == null) { - try { - c = Class.forName(name, true, getClass().getClassLoader()); - } catch (ClassNotFoundException e) { - c = null; - } + c = resolver.resolveClass(name, type, getClass().getClassLoader()); } if (c == null) { - try { - c = Class.forName(name, true, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); - } catch (ClassNotFoundException e) { - c = null; - } + c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); } return c; } - void replaceWithClass(Properties props, String key, ClassLoader loader, Class<?> type) { - Class<?> c = loadClass(props.get(key), loader); + void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) { + Class<?> c = loadClass(props.get(key), resolver, type); if (c != null) { props.put(key, c); } @@ -86,11 +78,28 @@ public class KafkaProducer extends DefaultAsyncProducer { Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); - if (endpoint.getCamelContext() != null) { - ClassLoader loader = endpoint.getCamelContext().getApplicationContextClassLoader(); - replaceWithClass(props, "key.serializer", loader, Serializer.class); - replaceWithClass(props, "value.serializer", loader, Serializer.class); - replaceWithClass(props, "partitioner.class", loader, Partitioner.class); + try { + if (endpoint.getCamelContext() != null) { + ClassResolver resolver = endpoint.getCamelContext().getClassResolver(); + replaceWithClass(props, "key.serializer", resolver, Serializer.class); + replaceWithClass(props, "value.serializer", resolver, Serializer.class); + + try { + //doesn't exist in old version of Kafka client so detect and only call the method if + //the field/config actually exists + Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG"); + if (f != null) { + loadParitionerClass(resolver, props); + } + } catch (NoSuchFieldException e) { + //ignore + } catch (SecurityException e) { + //ignore + } + } + } catch (Throwable t) { + //can ignore and Kafka itself might be able to handle it, if not, it will throw an exception + LOG.debug("Problem loading classes for Serializers", t); } if (endpoint.getBrokers() != null) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); @@ -98,6 +107,11 @@ public class KafkaProducer extends DefaultAsyncProducer { return props; } + private void loadParitionerClass(ClassResolver resolver, Properties props) { + replaceWithClass(props, "partitioner.class", resolver, Partitioner.class); + } + + public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { return kafkaProducer; } @@ -187,7 +201,7 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override public void remove() { - msgList.remove(); + msgList.remove(); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index d5b65fa..30f2b13 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -99,7 +99,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } @Override - protected RoutesBuilder createRouteBuilder() throws Exception { + protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception {