Repository: camel
Updated Branches:
  refs/heads/master 67b17a5bd -> 349e7a5c9


[CAMEL-10087, CAMEL-10069] Port 10069 to consumer (as much as can be), set 
consumer TCCL to null since not all can be worked around.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/349e7a5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/349e7a5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/349e7a5c

Branch: refs/heads/master
Commit: 349e7a5c987838a2a21c9229eea72b739cdcb06e
Parents: 67b17a5
Author: Daniel Kulp <dk...@apache.org>
Authored: Sat Jun 25 08:50:29 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Sat Jun 25 08:50:29 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 10 ++-
 .../camel/component/kafka/KafkaEndpoint.java    | 64 +++++++++++++++++++-
 .../camel/component/kafka/KafkaProducer.java    | 57 +----------------
 3 files changed, 73 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/349e7a5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 8b995ff..3c6e18c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -57,6 +57,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createConsumerProperties();
+        endpoint.updateClassProperties(props);
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
         props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId());
         return props;
@@ -98,7 +99,14 @@ public class KafkaConsumer extends DefaultConsumer {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
-            this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+            
+            ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
+            try {
+                Thread.currentThread().setContextClassLoader(null);
+                this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+            } finally {
+                
Thread.currentThread().setContextClassLoader(threadClassLoader);
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/349e7a5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index ec75c4b..f640bdb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -27,16 +28,24 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.SynchronousDelegateProducer;
+import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.util.CastUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The kafka component allows messages to be sent to (or consumed from) Apache 
Kafka brokers.
  */
 @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", 
consumerClass = KafkaConsumer.class, label = "messaging")
 public class KafkaEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaEndpoint.class);
+    
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @UriParam
@@ -91,6 +100,59 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return true;
     }
 
+    
+    private void loadParitionerClass(ClassResolver resolver, Properties props) 
{
+        replaceWithClass(props, "partitioner.class", resolver, 
Partitioner.class);
+    }
+    <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
+        if (o == null || o instanceof Class) {
+            return CastUtils.cast((Class<?>)o);
+        }
+        String name = o.toString();
+        Class<T> c = resolver.resolveClass(name, type);
+        if (c == null) {
+            c = resolver.resolveClass(name, type, getClass().getClassLoader());
+        }
+        if (c == null) {
+            c = resolver.resolveClass(name, type, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+        }
+        return c;
+    }
+    void replaceWithClass(Properties props, String key,  ClassResolver 
resolver, Class<?> type) {
+        Class<?> c = loadClass(props.get(key), resolver, type);
+        if (c != null) {
+            props.put(key, c);
+        }
+    }
+
+    public void updateClassProperties(Properties props) {
+        try {
+            if (getCamelContext() != null) {
+                ClassResolver resolver = getCamelContext().getClassResolver();
+                replaceWithClass(props, "key.serializer", resolver, 
Serializer.class);
+                replaceWithClass(props, "value.serializer", resolver, 
Serializer.class);
+                
+                try {
+                    //doesn't exist in old version of Kafka client so detect 
and only call the method if
+                    //the field/config actually exists
+                    Field f = 
ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
+                    if (f != null) {
+                        loadParitionerClass(resolver, props);
+                    }
+                } catch (NoSuchFieldException e) {
+                    //ignore
+                } catch (SecurityException e) {
+                    //ignore
+                }
+                //doesn't work as it needs to be List<String>  :(
+                //replaceWithClass(props, "partition.assignment.strategy", 
resolver, PartitionAssignor.class);
+            }
+        } catch (Throwable t) {
+            //can ignore and Kafka itself might be able to handle it, if not, 
it will throw an exception
+            LOG.debug("Problem loading classes for Serializers", t);
+        }
+    }
+    
     public ExecutorService createExecutor() {
         return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"KafkaConsumer[" + configuration.getTopic() + "]", 
configuration.getConsumerStreams());
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/349e7a5c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a81ec15..b1a496c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -31,19 +30,12 @@ import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.spi.ClassResolver;
-import org.apache.camel.util.CastUtils;
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KafkaProducer extends DefaultAsyncProducer {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProducer.class);
     
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
@@ -54,63 +46,16 @@ public class KafkaProducer extends DefaultAsyncProducer {
         super(endpoint);
         this.endpoint = endpoint;
     }
-
-    <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
-        if (o == null || o instanceof Class) {
-            return CastUtils.cast((Class<?>)o);
-        }
-        String name = o.toString();
-        Class<T> c = resolver.resolveClass(name, type);
-        if (c == null) {
-            c = resolver.resolveClass(name, type, getClass().getClassLoader());
-        }
-        if (c == null) {
-            c = resolver.resolveClass(name, type, 
org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
-        }
-        return c;
-    }
-    void replaceWithClass(Properties props, String key,  ClassResolver 
resolver, Class<?> type) {
-        Class<?> c = loadClass(props.get(key), resolver, type);
-        if (c != null) {
-            props.put(key, c);
-        }
-    }
     
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
-        try {
-            if (endpoint.getCamelContext() != null) {
-                ClassResolver resolver = 
endpoint.getCamelContext().getClassResolver();
-                replaceWithClass(props, "key.serializer", resolver, 
Serializer.class);
-                replaceWithClass(props, "value.serializer", resolver, 
Serializer.class);
-                
-                try {
-                    //doesn't exist in old version of Kafka client so detect 
and only call the method if
-                    //the field/config actually exists
-                    Field f = 
ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
-                    if (f != null) {
-                        loadParitionerClass(resolver, props);
-                    }
-                } catch (NoSuchFieldException e) {
-                    //ignore
-                } catch (SecurityException e) {
-                    //ignore
-                }
-            }
-        } catch (Throwable t) {
-            //can ignore and Kafka itself might be able to handle it, if not, 
it will throw an exception
-            LOG.debug("Problem loading classes for Serializers", t);
-        }
+        endpoint.updateClassProperties(props);
         if (endpoint.getBrokers() != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
         }
         return props;
     }
 
-    private void loadParitionerClass(ClassResolver resolver, Properties props) 
{
-        replaceWithClass(props, "partitioner.class", resolver, 
Partitioner.class);
-    }
-
 
     public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
         return kafkaProducer;

Reply via email to