CAMEL-10832: Kafka. Allow to configure brokers on component level. And made topic as part of context-path so using it is similar to JMS etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d3b38a05 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d3b38a05 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d3b38a05 Branch: refs/heads/master Commit: d3b38a05b7b6552be1c083aa2aed529159471a90 Parents: d6e45cb Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Feb 15 11:14:38 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 15 11:16:40 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 4 ++-- .../camel/component/kafka/KafkaComponent.java | 7 ++++--- .../component/kafka/KafkaConfiguration.java | 2 +- .../camel/component/kafka/KafkaConsumer.java | 20 +++++++++++++++++--- .../camel/component/kafka/KafkaEndpoint.java | 6 ++++++ .../camel/component/kafka/KafkaProducer.java | 10 +++++++--- .../component/kafka/BaseEmbeddedKafkaTest.java | 2 +- .../component/kafka/KafkaComponentTest.java | 7 ++++--- .../KafkaConsumerOffsetRepositoryEmptyTest.java | 10 +++++----- ...KafkaConsumerOffsetRepositoryResumeTest.java | 10 +++++----- .../component/kafka/KafkaConsumerTest.java | 4 ++++ .../springboot/KafkaComponentConfiguration.java | 2 +- 12 files changed, 57 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index f9d2749..211d592 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -42,7 +42,7 @@ The Kafka component supports 2 options which are listed below. [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description -| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. +| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. | workerPool | advanced | | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed. |======================================================================= {% endraw %} @@ -58,7 +58,7 @@ The Kafka component supports 80 endpoint options which are listed below: |======================================================================= | Name | Group | Default | Java Type | Description | topic | common | | String | *Required* Name of the topic to use. -| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation. +| brokers | common | | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. | clientId | common | | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | groupId | common | | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 575dcfc..da0f59d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -49,10 +49,11 @@ public class KafkaComponent extends UriEndpointComponent { throw new IllegalArgumentException("Topic must be configured on endpoint using syntax kafka:topic"); } endpoint.getConfiguration().setTopic(remaining); - - endpoint.getConfiguration().setBrokers(getBrokers()); endpoint.getConfiguration().setWorkerPool(getWorkerPool()); + // brokers can be configured on either component or endpoint level + // and the consumer and produce is aware of this and act accordingly + setProperties(endpoint.getConfiguration(), params); setProperties(endpoint, params); return endpoint; @@ -66,7 +67,7 @@ public class KafkaComponent extends UriEndpointComponent { * URL of the Kafka brokers to use. * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. * <p/> - * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. + * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation. */ public void setBrokers(String brokers) { this.brokers = brokers; http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index bec73da..57eea8f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -616,7 +616,7 @@ public class KafkaConfiguration { * URL of the Kafka brokers to use. * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. * <p/> - * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. + * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation. */ public void setBrokers(String brokers) { this.brokers = brokers; http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 549c1c2..0327a76 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -29,6 +29,7 @@ import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.spi.StateRepository; import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -55,9 +56,15 @@ public class KafkaConsumer extends DefaultConsumer { this.processor = processor; this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs(); - if (endpoint.getConfiguration().getBrokers() == null) { - throw new IllegalArgumentException("BootStrap servers must be specified"); + // brokers can be configured on endpoint or component level + String brokers = endpoint.getConfiguration().getBrokers(); + if (brokers == null) { + brokers = endpoint.getComponent().getBrokers(); } + if (ObjectHelper.isEmpty(brokers)) { + throw new IllegalArgumentException("Brokers must be configured"); + } + if (endpoint.getConfiguration().getGroupId() == null) { throw new IllegalArgumentException("groupId must not be null"); } @@ -66,7 +73,14 @@ public class KafkaConsumer extends DefaultConsumer { Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); endpoint.updateClassProperties(props); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getConfiguration().getBrokers()); + + // brokers can be configured on endpoint or component level + String brokers = endpoint.getConfiguration().getBrokers(); + if (brokers == null) { + brokers = endpoint.getComponent().getBrokers(); + } + + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getConfiguration().getGroupId()); return props; } http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 3c125e5..46bf844 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.util.Properties; import java.util.concurrent.ExecutorService; +import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -60,6 +61,11 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS super(endpointUri, component); } + @Override + public KafkaComponent getComponent() { + return (KafkaComponent) super.getComponent(); + } + public KafkaConfiguration getConfiguration() { return configuration; } http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 2ffe96b..abfc588 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -51,13 +51,17 @@ public class KafkaProducer extends DefaultAsyncProducer { Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); endpoint.updateClassProperties(props); - if (endpoint.getConfiguration().getBrokers() != null) { - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getConfiguration().getBrokers()); + + // brokers can be configured on endpoint or component level + String brokers = endpoint.getConfiguration().getBrokers(); + if (brokers == null) { + brokers = endpoint.getComponent().getBrokers(); } + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + return props; } - public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { return kafkaProducer; } http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java index e36ed0a..6b72d33 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; public class BaseEmbeddedKafkaTest extends CamelTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class); + static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class); static EmbeddedZookeeper embeddedZookeeper; static EmbeddedKafkaCluster embeddedKafkaCluster; http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 8d106e9..92982cc 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.kafka; -import static org.junit.Assert.assertEquals; - import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -30,6 +28,8 @@ import org.apache.kafka.common.config.SslConfigs; import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; + public class KafkaComponentTest { private CamelContext context = Mockito.mock(CamelContext.class); @@ -52,7 +52,8 @@ public class KafkaComponentTest { String uri = "kafka:mytopic?partitioner=com.class.Party"; KafkaEndpoint endpoint = (KafkaEndpoint) kafka.createEndpoint(uri); - assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers()); + assertEquals(null, endpoint.getConfiguration().getBrokers()); + assertEquals("broker1:12345,broker2:12566", endpoint.getComponent().getBrokers()); assertEquals("mytopic", endpoint.getConfiguration().getTopic()); assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner()); } http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java index e8cb50a..2b59970 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java @@ -95,11 +95,11 @@ public class KafkaConsumerOffsetRepositoryEmptyTest extends BaseEmbeddedKafkaTes return new RouteBuilder() { @Override public void configure() throws Exception { - from("kafka:" + TOPIC + - "?groupId=A" + - "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset - "&consumersCount=2" + // We have 2 partitions, we want 1 consumer per partition - "&offsetRepository=#offset") // Keep the offset in our repository + from("kafka:" + TOPIC + + "?groupId=A" + + "&autoOffsetReset=earliest" // Ask to start from the beginning if we have unknown offset + + "&consumersCount=2" // We have 2 partitions, we want 1 consumer per partition + + "&offsetRepository=#offset") // Keep the offset in our repository .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java index 36555a2..fe8052a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java @@ -97,11 +97,11 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe return new RouteBuilder() { @Override public void configure() throws Exception { - from("kafka:" + TOPIC + - "?groupId=A" + - "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset - "&consumersCount=2" + // We have 2 partitions, we want 1 consumer per partition - "&offsetRepository=#offset") // Keep the offset in our repository + from("kafka:" + TOPIC + + "?groupId=A" + + "&autoOffsetReset=earliest" // Ask to start from the beginning if we have unknown offset + + "&consumersCount=2" // We have 2 partitions, we want 1 consumer per partition + + "&offsetRepository=#offset") // Keep the offset in our repository .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 97311d7..3e249b4 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -25,11 +25,13 @@ import static org.mockito.Mockito.when; public class KafkaConsumerTest { private KafkaConfiguration configuration = mock(KafkaConfiguration.class); + private KafkaComponent component = mock(KafkaComponent.class); private KafkaEndpoint endpoint = mock(KafkaEndpoint.class); private Processor processor = mock(Processor.class); @Test(expected = IllegalArgumentException.class) public void consumerRequiresBootstrapServers() throws Exception { + when(endpoint.getComponent()).thenReturn(component); when(endpoint.getConfiguration()).thenReturn(configuration); when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne"); new KafkaConsumer(endpoint, processor); @@ -37,6 +39,7 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void consumerRequiresGroupId() throws Exception { + when(endpoint.getComponent()).thenReturn(component); when(endpoint.getConfiguration()).thenReturn(configuration); when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:1234"); new KafkaConsumer(endpoint, processor); @@ -44,6 +47,7 @@ public class KafkaConsumerTest { @Test public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception { + when(endpoint.getComponent()).thenReturn(component); when(endpoint.getConfiguration()).thenReturn(configuration); when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne"); when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181"); http://git-wip-us.apache.org/repos/asf/camel/blob/d3b38a05/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index b0eaa20..3396544 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -31,7 +31,7 @@ public class KafkaComponentConfiguration { /** * URL of the Kafka brokers to use. The format is host1:port1host2:port2 and * the list can be a subset of brokers or a VIP pointing to a subset of - * brokers. This option is known as metadata.broker.list in the Kafka + * brokers. This option is known as bootstrap.servers in the Kafka * documentation. */ private String brokers;