CAMEL-10832: Kafka. Allow to configure brokers on component level. And made 
topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d3b38a05
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d3b38a05
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d3b38a05

Branch: refs/heads/master
Commit: d3b38a05b7b6552be1c083aa2aed529159471a90
Parents: d6e45cb
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Feb 15 11:14:38 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 15 11:16:40 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  4 ++--
 .../camel/component/kafka/KafkaComponent.java   |  7 ++++---
 .../component/kafka/KafkaConfiguration.java     |  2 +-
 .../camel/component/kafka/KafkaConsumer.java    | 20 +++++++++++++++++---
 .../camel/component/kafka/KafkaEndpoint.java    |  6 ++++++
 .../camel/component/kafka/KafkaProducer.java    | 10 +++++++---
 .../component/kafka/BaseEmbeddedKafkaTest.java  |  2 +-
 .../component/kafka/KafkaComponentTest.java     |  7 ++++---
 .../KafkaConsumerOffsetRepositoryEmptyTest.java | 10 +++++-----
 ...KafkaConsumerOffsetRepositoryResumeTest.java | 10 +++++-----
 .../component/kafka/KafkaConsumerTest.java      |  4 ++++
 .../springboot/KafkaComponentConfiguration.java |  2 +-
 12 files changed, 57 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f9d2749..211d592 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -42,7 +42,7 @@ The Kafka component supports 2 options which are listed below.
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as bootstrap.servers in 
the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker 
pool for continue routing Exchange after kafka server has acknowledge the 
message that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. If using this option then you must handle the lifecycle of the 
thread pool to shut the pool down when no longer needed.
 |=======================================================================
 {% endraw %}
@@ -58,7 +58,7 @@ The Kafka component supports 80 endpoint options which are 
listed below:
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | topic | common |  | String | *Required* Name of the topic to use.
-| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as bootstrap.servers in 
the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string 
sent in each request to help trace calls. It should logically identify the 
application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group 
of consumer processes to which this consumer belongs. By setting the same group 
id multiple processes indicate that they are all part of the same consumer 
group.
 | partitioner | common | 
org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The 
partitioner class for partitioning messages amongst sub-topics. The default 
partitioner is based on the hash of the key.

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 575dcfc..da0f59d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -49,10 +49,11 @@ public class KafkaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("Topic must be configured on 
endpoint using syntax kafka:topic");
         }
         endpoint.getConfiguration().setTopic(remaining);
-
-        endpoint.getConfiguration().setBrokers(getBrokers());
         endpoint.getConfiguration().setWorkerPool(getWorkerPool());
 
+        // brokers can be configured on either component or endpoint level
+        // and the consumer and produce is aware of this and act accordingly
+
         setProperties(endpoint.getConfiguration(), params);
         setProperties(endpoint, params);
         return endpoint;
@@ -66,7 +67,7 @@ public class KafkaComponent extends UriEndpointComponent {
      * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of 
brokers or a VIP pointing to a subset of brokers.
      * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka 
documentation.
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka 
documentation.
      */
     public void setBrokers(String brokers) {
         this.brokers = brokers;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index bec73da..57eea8f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -616,7 +616,7 @@ public class KafkaConfiguration {
      * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of 
brokers or a VIP pointing to a subset of brokers.
      * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka 
documentation.
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka 
documentation.
      */
     public void setBrokers(String brokers) {
         this.brokers = brokers;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 549c1c2..0327a76 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -55,9 +56,15 @@ public class KafkaConsumer extends DefaultConsumer {
         this.processor = processor;
         this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
 
-        if (endpoint.getConfiguration().getBrokers() == null) {
-            throw new IllegalArgumentException("BootStrap servers must be 
specified");
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
         }
+        if (ObjectHelper.isEmpty(brokers)) {
+            throw new IllegalArgumentException("Brokers must be configured");
+        }
+
         if (endpoint.getConfiguration().getGroupId() == null) {
             throw new IllegalArgumentException("groupId must not be null");
         }
@@ -66,7 +73,14 @@ public class KafkaConsumer extends DefaultConsumer {
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getConfiguration().getBrokers());
+
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
+        }
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, 
endpoint.getConfiguration().getGroupId());
         return props;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 3c125e5..46bf844 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -20,6 +20,7 @@ import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -60,6 +61,11 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         super(endpointUri, component);
     }
 
+    @Override
+    public KafkaComponent getComponent() {
+        return (KafkaComponent) super.getComponent();
+    }
+
     public KafkaConfiguration getConfiguration() {
         return configuration;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2ffe96b..abfc588 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -51,13 +51,17 @@ public class KafkaProducer extends DefaultAsyncProducer {
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
-        if (endpoint.getConfiguration().getBrokers() != null) {
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getConfiguration().getBrokers());
+
+        // brokers can be configured on endpoint or component level
+        String brokers = endpoint.getConfiguration().getBrokers();
+        if (brokers == null) {
+            brokers = endpoint.getComponent().getBrokers();
         }
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+
         return props;
     }
 
-
     public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
         return kafkaProducer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index e36ed0a..6b72d33 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
-    private static final Logger  LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
+    static final Logger LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
 
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 8d106e9..92982cc 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.kafka;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -30,6 +28,8 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertEquals;
+
 public class KafkaComponentTest {
 
     private CamelContext context = Mockito.mock(CamelContext.class);
@@ -52,7 +52,8 @@ public class KafkaComponentTest {
         String uri = "kafka:mytopic?partitioner=com.class.Party";
 
         KafkaEndpoint endpoint = (KafkaEndpoint) kafka.createEndpoint(uri);
-        assertEquals("broker1:12345,broker2:12566", 
endpoint.getConfiguration().getBrokers());
+        assertEquals(null, endpoint.getConfiguration().getBrokers());
+        assertEquals("broker1:12345,broker2:12566", 
endpoint.getComponent().getBrokers());
         assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("com.class.Party", 
endpoint.getConfiguration().getPartitioner());
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
index e8cb50a..2b59970 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
@@ -95,11 +95,11 @@ public class KafkaConsumerOffsetRepositoryEmptyTest extends 
BaseEmbeddedKafkaTes
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:" + TOPIC +
-                             "?groupId=A" +
-                             "&autoOffsetReset=earliest" +             // Ask 
to start from the beginning if we have unknown offset
-                             "&consumersCount=2" +                     // We 
have 2 partitions, we want 1 consumer per partition
-                             "&offsetRepository=#offset")              // Keep 
the offset in our repository
+                from("kafka:" + TOPIC
+                             + "?groupId=A"
+                             + "&autoOffsetReset=earliest"             // Ask 
to start from the beginning if we have unknown offset
+                             + "&consumersCount=2"                     // We 
have 2 partitions, we want 1 consumer per partition
+                             + "&offsetRepository=#offset")            // Keep 
the offset in our repository
                         .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 36555a2..fe8052a 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -97,11 +97,11 @@ public class KafkaConsumerOffsetRepositoryResumeTest 
extends BaseEmbeddedKafkaTe
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("kafka:" + TOPIC +
-                             "?groupId=A" +
-                             "&autoOffsetReset=earliest" +             // Ask 
to start from the beginning if we have unknown offset
-                             "&consumersCount=2" +                     // We 
have 2 partitions, we want 1 consumer per partition
-                             "&offsetRepository=#offset")              // Keep 
the offset in our repository
+                from("kafka:" + TOPIC
+                             + "?groupId=A"
+                             + "&autoOffsetReset=earliest"             // Ask 
to start from the beginning if we have unknown offset
+                             + "&consumersCount=2"                     // We 
have 2 partitions, we want 1 consumer per partition
+                             + "&offsetRepository=#offset")            // Keep 
the offset in our repository
                         .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 97311d7..3e249b4 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -25,11 +25,13 @@ import static org.mockito.Mockito.when;
 public class KafkaConsumerTest {
 
     private KafkaConfiguration configuration = mock(KafkaConfiguration.class);
+    private KafkaComponent component = mock(KafkaComponent.class);
     private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
     private Processor processor = mock(Processor.class);
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresBootstrapServers() throws Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         new KafkaConsumer(endpoint, processor);
@@ -37,6 +39,7 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         
when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:1234");
         new KafkaConsumer(endpoint, processor);
@@ -44,6 +47,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void consumerOnlyRequiresBootstrapServersAndGroupId() throws 
Exception {
+        when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         
when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");

http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index b0eaa20..3396544 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -31,7 +31,7 @@ public class KafkaComponentConfiguration {
     /**
      * URL of the Kafka brokers to use. The format is host1:port1host2:port2 
and
      * the list can be a subset of brokers or a VIP pointing to a subset of
-     * brokers. This option is known as metadata.broker.list in the Kafka
+     * brokers. This option is known as bootstrap.servers in the Kafka
      * documentation.
      */
     private String brokers;

Reply via email to