Repository: camel
Updated Branches:
  refs/heads/master c5d00cce6 -> e78476132


Search various classloaders for the configs that are classes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e7847613
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e7847613
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e7847613

Branch: refs/heads/master
Commit: e7847613240aafe72ca6c5cbc922b18e8b8ad170
Parents: c5d00cc
Author: Daniel Kulp <dk...@apache.org>
Authored: Fri Jun 17 10:15:26 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Fri Jun 17 10:15:26 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 43 ++++++++++++++++++++
 1 file changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e7847613/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index e2f25fb..ae2f2a4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -31,9 +31,11 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.Serializer;
 
 public class KafkaProducer extends DefaultAsyncProducer {
 
@@ -47,8 +49,49 @@ public class KafkaProducer extends DefaultAsyncProducer {
         this.endpoint = endpoint;
     }
 
+    
+    Class<?> loadClass(Object o, ClassLoader loader) {
+        if (o == null || o instanceof Class) {
+            return (Class<?>)o;
+        }
+        String name = o.toString();
+        Class<?> c;
+        try {
+            c = Class.forName(name, true, loader);
+        } catch (ClassNotFoundException e) {
+            c = null;
+        }
+        if (c == null) {
+            try {
+                c = Class.forName(name, true, getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                c = null;
+            }
+        }
+        if (c == null) {
+            try {
+                c = Class.forName(name, true, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                c = null;
+            }
+        }
+        return c;
+    }
+    void replaceWithClass(Properties props, String key,  ClassLoader loader, 
Class<?> type) {
+        Class<?> c = loadClass(props.get(key), loader);
+        if (c != null) {
+            props.put(key, c);
+        }
+    }
+    
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
+        if (endpoint.getCamelContext() != null) {
+            ClassLoader loader = 
endpoint.getCamelContext().getApplicationContextClassLoader();
+            replaceWithClass(props, "key.serializer", loader, 
Serializer.class);
+            replaceWithClass(props, "value.serializer", loader, 
Serializer.class);
+            replaceWithClass(props, "partitioner.class", loader, 
Partitioner.class);
+        }
         if (endpoint.getBrokers() != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
         }

Reply via email to