Repository: camel Updated Branches: refs/heads/master ba43a8b96 -> fbd2438fe
CAMEL-10585: Changes to URL format to include topic name and default port Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced96163 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced96163 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced96163 Branch: refs/heads/master Commit: ced96163a1fdf42a29180f9a84f9b470669c38da Parents: ba43a8b Author: admin <ad...@test.com> Authored: Tue Feb 14 16:30:32 2017 +0530 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 15 10:11:40 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 11 ++++ components/camel-kafka/src/main/docs/kafka.adoc | 10 ++++ .../camel/component/kafka/KafkaComponent.java | 31 +++++++++-- .../component/kafka/KafkaComponentTest.java | 55 +++++++++++++++++--- 4 files changed, 98 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index c2516f7..01d2bff 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -26,6 +26,17 @@ From Camel 2.17 onwards Scala is no longer used, as we use the kafka java client [source,java] --------------------------- kafka:server:port[?options] + +OR + +kafka:server:port/topicName[?options] + +OR + +kafka:server/topicName[?options] + +For the option above default port 9092 is used in the URI + --------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/docs/kafka.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc index f81ccfd..73504c2 100644 --- a/components/camel-kafka/src/main/docs/kafka.adoc +++ b/components/camel-kafka/src/main/docs/kafka.adoc @@ -50,6 +50,16 @@ URI format [source,java] --------------------------- kafka:server:port[?options] + +OR + +kafka:server:port/topicName[?options] + +OR + +kafka:server/topicName[?options] + +For the option above default port 9092 is used in the URI --------------------------- Â http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 36baf3c..f2d22f2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -18,6 +18,8 @@ package org.apache.camel.component.kafka; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -25,6 +27,13 @@ import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.spi.Metadata; public class KafkaComponent extends UriEndpointComponent { + + // Topic name validation as per Kafka documentation [a-zA-Z0-9\\._\\-] as of 0.10 + // hostname and port are extracted as per pattern. IP and hostname syntax is not validated using regex. + + static final Pattern SIMPLE_KAFKA_URI_PATTERN = Pattern.compile("([a-z0-9\\.]*)(:?)([0-9]*)/([a-zA-Z0-9\\._\\-]*)", Pattern.CASE_INSENSITIVE); + + static final String DEFAULT_PORT = "9092"; @Metadata(label = "advanced") private ExecutorService workerPool; @@ -39,10 +48,26 @@ public class KafkaComponent extends UriEndpointComponent { @Override protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { + KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); - String brokers = remaining.split("\\?")[0]; - if (brokers != null) { - endpoint.getConfiguration().setBrokers(brokers); + + Matcher matcher = SIMPLE_KAFKA_URI_PATTERN.matcher(remaining); + + if (matcher.matches()) { + String hostName = matcher.group(1); + String port = matcher.group(3); + String topic = matcher.group(4); + + if (port != null && port.length() == 0) { + port = DEFAULT_PORT; + } + endpoint.getConfiguration().setBrokers(hostName + ":" + port); + endpoint.getConfiguration().setTopic(topic); + } else { + String brokers = remaining.split("\\?")[0]; + if (brokers != null) { + endpoint.getConfiguration().setBrokers(brokers); + } } // configure component options before endpoint properties which can override from params http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index e6edd4b..e94ba24 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.kafka; +import static org.junit.Assert.assertEquals; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -28,7 +30,6 @@ import org.apache.kafka.common.config.SslConfigs; import org.junit.Test; import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; public class KafkaComponentTest { @@ -36,7 +37,7 @@ public class KafkaComponentTest { @Test public void testPropertiesSet() throws Exception { - Map<String, Object> params = new HashMap<>(); + Map<String, Object> params = new HashMap<String, Object>(); params.put("topic", "mytopic"); params.put("partitioner", "com.class.Party"); @@ -51,7 +52,7 @@ public class KafkaComponentTest { @Test public void testAllProducerConfigProperty() throws Exception { - Map<String, Object> params = new HashMap<>(); + Map<String, Object> params = new HashMap<String, Object>(); setProducerProperty(params); String uri = "kafka:dev1:12345,dev2:12566"; @@ -102,12 +103,11 @@ public class KafkaComponentTest { assertEquals("test", endpoint.getConfiguration().getSslEndpointAlgorithm()); assertEquals("SunX509", endpoint.getConfiguration().getSslKeymanagerAlgorithm()); assertEquals("PKIX", endpoint.getConfiguration().getSslTrustmanagerAlgorithm()); - assertEquals("org.apache.camel.component.kafka.MockProducerInterceptor", endpoint.getConfiguration().getInterceptorClasses()); } @Test public void testAllProducerKeys() throws Exception { - Map<String, Object> params = new HashMap<>(); + Map<String, Object> params = new HashMap<String, Object>(); String uri = "kafka:dev1:12345,dev2:12566"; String remaining = "dev1:12345,dev2:12566"; @@ -201,7 +201,50 @@ public class KafkaComponentTest { params.put("sslEndpointAlgorithm", "test"); params.put("sslKeymanagerAlgorithm", "SunX509"); params.put("sslTrustmanagerAlgorithm", "PKIX"); - params.put("interceptorClasses", "org.apache.camel.component.kafka.MockProducerInterceptor"); } + + // the URL format should include the topic name like the ActiiveMQ & AMQP endpoints + // kafka:serverName:port/topicName + // kafka:serverName/topicName + + @Test + public void testSimpleKakfaUriEndpoint() throws Exception { + + Map<String, Object> params = new HashMap<String, Object>(); + + String uri = "kafka:broker1:9999/topic2One.33"; + String remaining = "broker1:9999/topic2One.33"; + + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + + assertEquals("topic2One.33", endpoint.getConfiguration().getTopic()); + assertEquals("broker1:9999", endpoint.getConfiguration().getBrokers()); + + // port not provided in the URI + + uri = "kafka:broker1/click-Topic"; + remaining = "broker1/click-Topic"; + + endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + + assertEquals("click-Topic", endpoint.getConfiguration().getTopic()); + assertEquals("broker1:9092", endpoint.getConfiguration().getBrokers()); + + // IP Address provided instead of hostname + + uri = "kafka:10.10.10.3/click-Topic"; + remaining = "10.10.10.3/click-Topic"; + + endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + + assertEquals("click-Topic", endpoint.getConfiguration().getTopic()); + assertEquals("10.10.10.3:9092", endpoint.getConfiguration().getBrokers()); + + + + + + } }