CAMEL-10832: Kafka. Allow to configure brokers on component level. And made 
topic as part of context-path so using it is similar to JMS etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d6e45cbe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d6e45cbe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d6e45cbe

Branch: refs/heads/master
Commit: d6e45cbe845b1c8854b0d8b57a4805f082c42277
Parents: eccfc85
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Feb 15 11:03:13 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 15 11:03:13 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 26 ++++++--------------
 .../camel/component/kafka/KafkaComponent.java   |  3 +--
 .../component/kafka/KafkaConfiguration.java     |  3 +--
 .../springboot/KafkaComponentConfiguration.java | 11 +++------
 4 files changed, 14 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 135b13f..f9d2749 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -25,17 +25,7 @@ From Camel 2.17 onwards Scala is no longer used, as we use 
the kafka java client
 
 [source,java]
 ---------------------------
-kafka:server:port[?options]
-
-OR
-
-kafka:server:port/topicName[?options]
-
-OR
-
-kafka:server/topicName[?options] 
-
-For the option above default port 9092 is used in the URI
+kafka:topic[?options]
 
 ---------------------------
 
@@ -52,7 +42,7 @@ The Kafka component supports 2 options which are listed below.
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| brokers | common |  | String | This is for bootstrapping and the producer 
will only use it for getting metadata (topics partitions and replicas). The 
socket connections for sending the actual data will be established based on the 
broker information returned in the metadata. The format is 
host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker 
pool for continue routing Exchange after kafka server has acknowledge the 
message that was sent to it from KafkaProducer using asynchronous non-blocking 
processing. If using this option then you must handle the lifecycle of the 
thread pool to shut the pool down when no longer needed.
 |=======================================================================
 {% endraw %}
@@ -68,7 +58,7 @@ The Kafka component supports 80 endpoint options which are 
listed below:
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | topic | common |  | String | *Required* Name of the topic to use.
-| brokers | common |  | String | This is for bootstrapping and the producer 
will only use it for getting metadata (topics partitions and replicas). The 
socket connections for sending the actual data will be established based on the 
broker information returned in the metadata. The format is 
host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
+| brokers | common |  | String | URL of the Kafka brokers to use. The format 
is host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as metadata.broker.list 
in the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string 
sent in each request to help trace calls. It should logically identify the 
application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group 
of consumer processes to which this consumer belongs. By setting the same group 
id multiple processes indicate that they are all part of the same consumer 
group.
 | partitioner | common | 
org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The 
partitioner class for partitioning messages amongst sub-topics. The default 
partitioner is based on the hash of the key.
@@ -196,7 +186,7 @@ After the message is sent to Kafka, the following headers 
are available
 Here is the minimal route you need in order to read messages from Kafka.
 [source,java]
 -------------------------------------------------------------
-from("kafka:localhost:9092?topic=test&groupId=testing")
+from("kafka:test?brokers=localhost:9092&groupId=testing")
     .log("Message received from Kafka : ${body}")
     .log("    on the topic ${headers[kafka.TOPIC]}")
     .log("    on the partition ${headers[kafka.PARTITION]}")
@@ -222,7 +212,7 @@ DefaultCamelContext camelContext = new 
DefaultCamelContext(registry);
 camelContext.addRoutes(new RouteBuilder() {
     @Override
     public void configure() throws Exception {
-        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                      "&groupId=A" +                            //
                      "&autoOffsetReset=earliest" +             // Ask to start 
from the beginning if we have unknown offset
                      "&offsetRepository=#offsetRepo")          // Keep the 
offsets in the previously configured repository
@@ -240,7 +230,7 @@ Here is the minimal route you need in order to write 
messages to Kafka.
 from("direct:start")
     .setBody(constant("Message from Camel"))          // Message to send
     .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
-    .to("kafka:localhost:9092?topic=test");
+    .to("kafka:test?brokers=localhost:9092");
 ----------------------------------------------------------------------------
 
 
@@ -251,7 +241,7 @@ You have 2 different ways to configure the SSL 
communication on the Kafka` compo
 The first way is through the many SSL endpoint parameters
 [source,java]
 -------------------------------------------------------------
-from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +
+from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
              "&groupId=A" +
              "&sslKeystoreLocation=/path/to/keystore.jks" +
              "&sslKeystorePassword=changeit" +
@@ -281,7 +271,7 @@ DefaultCamelContext camelContext = new 
DefaultCamelContext(registry);
 camelContext.addRoutes(new RouteBuilder() {
     @Override
     public void configure() throws Exception {
-        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                      "&groupId=A" +                            //
                      "&sslContextParameters=#ssl")             // Reference 
the SSL configuration
                 .to("mock:result");

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 5c7ce9b..575dcfc 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -63,8 +63,7 @@ public class KafkaComponent extends UriEndpointComponent {
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting 
metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established 
based on the broker information returned in the metadata.
+     * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of 
brokers or a VIP pointing to a subset of brokers.
      * <p/>
      * This option is known as <tt>metadata.broker.list</tt> in the Kafka 
documentation.

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b42e146..bec73da 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -613,8 +613,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting 
metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established 
based on the broker information returned in the metadata.
+     * URL of the Kafka brokers to use.
      * The format is host1:port1,host2:port2, and the list can be a subset of 
brokers or a VIP pointing to a subset of brokers.
      * <p/>
      * This option is known as <tt>metadata.broker.list</tt> in the Kafka 
documentation.

http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 9cb1576..b0eaa20 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -29,13 +29,10 @@ import 
org.springframework.boot.context.properties.ConfigurationProperties;
 public class KafkaComponentConfiguration {
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting
-     * metadata (topics partitions and replicas). The socket connections for
-     * sending the actual data will be established based on the broker
-     * information returned in the metadata. The format is
-     * host1:port1host2:port2 and the list can be a subset of brokers or a VIP
-     * pointing to a subset of brokers. This option is known as
-     * metadata.broker.list in the Kafka documentation.
+     * URL of the Kafka brokers to use. The format is host1:port1host2:port2 
and
+     * the list can be a subset of brokers or a VIP pointing to a subset of
+     * brokers. This option is known as metadata.broker.list in the Kafka
+     * documentation.
      */
     private String brokers;
     /**

Reply via email to