Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x a2665ae8b -> 82716bc5a


Search various classloaders for the configs that are classes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/82716bc5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/82716bc5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/82716bc5

Branch: refs/heads/camel-2.17.x
Commit: 82716bc5a8e77803f24a21219bf6fe9e6ab5bd09
Parents: a2665ae
Author: Daniel Kulp <dk...@apache.org>
Authored: Fri Jun 17 10:15:26 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Fri Jun 17 10:16:26 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 43 ++++++++++++++++++++
 1 file changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/82716bc5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 37cc920..f3a2fb9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -31,9 +31,11 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.Serializer;
 
 public class KafkaProducer extends DefaultAsyncProducer {
 
@@ -47,8 +49,49 @@ public class KafkaProducer extends DefaultAsyncProducer {
         this.endpoint = endpoint;
     }
 
+    
+    Class<?> loadClass(Object o, ClassLoader loader) {
+        if (o == null || o instanceof Class) {
+            return (Class<?>)o;
+        }
+        String name = o.toString();
+        Class<?> c;
+        try {
+            c = Class.forName(name, true, loader);
+        } catch (ClassNotFoundException e) {
+            c = null;
+        }
+        if (c == null) {
+            try {
+                c = Class.forName(name, true, getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                c = null;
+            }
+        }
+        if (c == null) {
+            try {
+                c = Class.forName(name, true, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                c = null;
+            }
+        }
+        return c;
+    }
+    void replaceWithClass(Properties props, String key,  ClassLoader loader, 
Class<?> type) {
+        Class<?> c = loadClass(props.get(key), loader);
+        if (c != null) {
+            props.put(key, c);
+        }
+    }
+    
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
+        if (endpoint.getCamelContext() != null) {
+            ClassLoader loader = 
endpoint.getCamelContext().getApplicationContextClassLoader();
+            replaceWithClass(props, "key.serializer", loader, 
Serializer.class);
+            replaceWithClass(props, "value.serializer", loader, 
Serializer.class);
+            replaceWithClass(props, "partitioner.class", loader, 
Partitioner.class);
+        }
         if (endpoint.getBrokers() != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
         }

Reply via email to