CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d6e45cbe Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d6e45cbe Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d6e45cbe Branch: refs/heads/master Commit: d6e45cbe845b1c8854b0d8b57a4805f082c42277 Parents: eccfc85 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Feb 15 11:03:13 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 15 11:03:13 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 26 ++++++-------------- .../camel/component/kafka/KafkaComponent.java | 3 +-- .../component/kafka/KafkaConfiguration.java | 3 +-- .../springboot/KafkaComponentConfiguration.java | 11 +++------ 4 files changed, 14 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 135b13f..f9d2749 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -25,17 +25,7 @@ From Camel 2.17 onwards Scala is no longer used, as we use the kafka java client [source,java] --------------------------- -kafka:server:port[?options] - -OR - -kafka:server:port/topicName[?options] - -OR - -kafka:server/topicName[?options] - -For the option above default port 9092 is used in the URI +kafka:topic[?options] --------------------------- @@ -52,7 +42,7 @@ The Kafka component supports 2 options which are listed below. [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description -| brokers | common | | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. +| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. | workerPool | advanced | | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed. |======================================================================= {% endraw %} @@ -68,7 +58,7 @@ The Kafka component supports 80 endpoint options which are listed below: |======================================================================= | Name | Group | Default | Java Type | Description | topic | common | | String | *Required* Name of the topic to use. -| brokers | common | | String | This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. +| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. | clientId | common | | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | groupId | common | | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. @@ -196,7 +186,7 @@ After the message is sent to Kafka, the following headers are available Here is the minimal route you need in order to read messages from Kafka. [source,java] ------------------------------------------------------------- -from("kafka:localhost:9092?topic=test&groupId=testing") +from("kafka:test?brokers=localhost:9092&groupId=testing") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") @@ -222,7 +212,7 @@ DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + // + from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + "&groupId=A" + // "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset "&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository @@ -240,7 +230,7 @@ Here is the minimal route you need in order to write messages to Kafka. from("direct:start") .setBody(constant("Message from Camel")) // Message to send .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message - .to("kafka:localhost:9092?topic=test"); + .to("kafka:test?brokers=localhost:9092"); ---------------------------------------------------------------------------- @@ -251,7 +241,7 @@ You have 2 different ways to configure the SSL communication on the Kafka` compo The first way is through the many SSL endpoint parameters [source,java] ------------------------------------------------------------- -from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + +from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + "&groupId=A" + "&sslKeystoreLocation=/path/to/keystore.jks" + "&sslKeystorePassword=changeit" + @@ -281,7 +271,7 @@ DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + // + from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + "&groupId=A" + // "&sslContextParameters=#ssl") // Reference the SSL configuration .to("mock:result"); http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 5c7ce9b..575dcfc 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -63,8 +63,7 @@ public class KafkaComponent extends UriEndpointComponent { } /** - * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). - * The socket connections for sending the actual data will be established based on the broker information returned in the metadata. + * URL of the Kafka brokers to use. * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. * <p/> * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index b42e146..bec73da 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -613,8 +613,7 @@ public class KafkaConfiguration { } /** - * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). - * The socket connections for sending the actual data will be established based on the broker information returned in the metadata. + * URL of the Kafka brokers to use. * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. * <p/> * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. http://git-wip-us.apache.org/repos/asf/camel/blob/d6e45cbe/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index 9cb1576..b0eaa20 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -29,13 +29,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class KafkaComponentConfiguration { /** - * This is for bootstrapping and the producer will only use it for getting - * metadata (topics partitions and replicas). The socket connections for - * sending the actual data will be established based on the broker - * information returned in the metadata. The format is - * host1:port1host2:port2 and the list can be a subset of brokers or a VIP - * pointing to a subset of brokers. This option is known as - * metadata.broker.list in the Kafka documentation. + * URL of the Kafka brokers to use. The format is host1:port1host2:port2 and + * the list can be a subset of brokers or a VIP pointing to a subset of + * brokers. This option is known as metadata.broker.list in the Kafka + * documentation. */ private String brokers; /**