Repository: camel Updated Branches: refs/heads/master c5d00cce6 -> e78476132
Search various classloaders for the configs that are classes Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e7847613 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e7847613 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e7847613 Branch: refs/heads/master Commit: e7847613240aafe72ca6c5cbc922b18e8b8ad170 Parents: c5d00cc Author: Daniel Kulp <dk...@apache.org> Authored: Fri Jun 17 10:15:26 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Fri Jun 17 10:15:26 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 43 ++++++++++++++++++++ 1 file changed, 43 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e7847613/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index e2f25fb..ae2f2a4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -31,9 +31,11 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.Serializer; public class KafkaProducer extends DefaultAsyncProducer { @@ -47,8 +49,49 @@ public class KafkaProducer extends DefaultAsyncProducer { this.endpoint = endpoint; } + + Class<?> loadClass(Object o, ClassLoader loader) { + if (o == null || o instanceof Class) { + return (Class<?>)o; + } + String name = o.toString(); + Class<?> c; + try { + c = Class.forName(name, true, loader); + } catch (ClassNotFoundException e) { + c = null; + } + if (c == null) { + try { + c = Class.forName(name, true, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + c = null; + } + } + if (c == null) { + try { + c = Class.forName(name, true, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); + } catch (ClassNotFoundException e) { + c = null; + } + } + return c; + } + void replaceWithClass(Properties props, String key, ClassLoader loader, Class<?> type) { + Class<?> c = loadClass(props.get(key), loader); + if (c != null) { + props.put(key, c); + } + } + Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); + if (endpoint.getCamelContext() != null) { + ClassLoader loader = endpoint.getCamelContext().getApplicationContextClassLoader(); + replaceWithClass(props, "key.serializer", loader, Serializer.class); + replaceWithClass(props, "value.serializer", loader, Serializer.class); + replaceWithClass(props, "partitioner.class", loader, Partitioner.class); + } if (endpoint.getBrokers() != null) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); }