Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 164512979 -> 38c75c5cc


[CAMEL-10069] Update to use ClassResolver to help search for the partitioner 
and serializers
Also pull search for Partitioner out into separate try block to allow for use 
with 0.8 kafka client (which doesn't have partitioner)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38c75c5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38c75c5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38c75c5c

Branch: refs/heads/camel-2.17.x
Commit: 38c75c5cc2ac127898aa19268901df7360d5631c
Parents: 1645129
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Jun 22 15:30:14 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Wed Jun 22 15:49:30 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    |  2 +-
 .../camel/component/kafka/KafkaProducer.java    | 70 ++++++++++++--------
 .../component/kafka/KafkaProducerFullTest.java  |  2 +-
 3 files changed, 44 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index d4cfe49..04b60cc 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -90,7 +90,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        public KafkaFetchRecords(String topicName, String id, Properties 
kafkaProps) {
+        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;

http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index f3a2fb9..a81ec15 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -30,15 +31,20 @@ import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.util.CastUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaProducer extends DefaultAsyncProducer {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProducer.class);
+    
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
     private ExecutorService workerPool;
@@ -49,36 +55,22 @@ public class KafkaProducer extends DefaultAsyncProducer {
         this.endpoint = endpoint;
     }
 
-    
-    Class<?> loadClass(Object o, ClassLoader loader) {
+    <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
         if (o == null || o instanceof Class) {
-            return (Class<?>)o;
+            return CastUtils.cast((Class<?>)o);
         }
         String name = o.toString();
-        Class<?> c;
-        try {
-            c = Class.forName(name, true, loader);
-        } catch (ClassNotFoundException e) {
-            c = null;
-        }
+        Class<T> c = resolver.resolveClass(name, type);
         if (c == null) {
-            try {
-                c = Class.forName(name, true, getClass().getClassLoader());
-            } catch (ClassNotFoundException e) {
-                c = null;
-            }
+            c = resolver.resolveClass(name, type, getClass().getClassLoader());
         }
         if (c == null) {
-            try {
-                c = Class.forName(name, true, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
-            } catch (ClassNotFoundException e) {
-                c = null;
-            }
+            c = resolver.resolveClass(name, type, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
         }
         return c;
     }
-    void replaceWithClass(Properties props, String key,  ClassLoader loader, 
Class<?> type) {
-        Class<?> c = loadClass(props.get(key), loader);
+    void replaceWithClass(Properties props, String key,  ClassResolver 
resolver, Class<?> type) {
+        Class<?> c = loadClass(props.get(key), resolver, type);
         if (c != null) {
             props.put(key, c);
         }
@@ -86,11 +78,28 @@ public class KafkaProducer extends DefaultAsyncProducer {
     
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
-        if (endpoint.getCamelContext() != null) {
-            ClassLoader loader = 
endpoint.getCamelContext().getApplicationContextClassLoader();
-            replaceWithClass(props, "key.serializer", loader, 
Serializer.class);
-            replaceWithClass(props, "value.serializer", loader, 
Serializer.class);
-            replaceWithClass(props, "partitioner.class", loader, 
Partitioner.class);
+        try {
+            if (endpoint.getCamelContext() != null) {
+                ClassResolver resolver = 
endpoint.getCamelContext().getClassResolver();
+                replaceWithClass(props, "key.serializer", resolver, 
Serializer.class);
+                replaceWithClass(props, "value.serializer", resolver, 
Serializer.class);
+                
+                try {
+                    //doesn't exist in old version of Kafka client so detect 
and only call the method if
+                    //the field/config actually exists
+                    Field f = 
ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
+                    if (f != null) {
+                        loadParitionerClass(resolver, props);
+                    }
+                } catch (NoSuchFieldException e) {
+                    //ignore
+                } catch (SecurityException e) {
+                    //ignore
+                }
+            }
+        } catch (Throwable t) {
+            //can ignore and Kafka itself might be able to handle it, if not, 
it will throw an exception
+            LOG.debug("Problem loading classes for Serializers", t);
         }
         if (endpoint.getBrokers() != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
@@ -98,6 +107,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return props;
     }
 
+    private void loadParitionerClass(ClassResolver resolver, Properties props) 
{
+        replaceWithClass(props, "partitioner.class", resolver, 
Partitioner.class);
+    }
+
+
     public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
         return kafkaProducer;
     }
@@ -187,7 +201,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
                 @Override
                 public void remove() {
-                    msgList.remove();                                  
+                    msgList.remove();
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/38c75c5c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index d5b65fa..30f2b13 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -99,7 +99,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     }
 
     @Override
-    protected RoutesBuilder createRouteBuilder() throws Exception {
+    protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {

Reply via email to