Repository: camel
Updated Branches:
  refs/heads/master ba43a8b96 -> fbd2438fe


CAMEL-10585: Changes to URL format to include topic name and default port


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced96163
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced96163
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced96163

Branch: refs/heads/master
Commit: ced96163a1fdf42a29180f9a84f9b470669c38da
Parents: ba43a8b
Author: admin <ad...@test.com>
Authored: Tue Feb 14 16:30:32 2017 +0530
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 15 10:11:40 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 11 ++++
 components/camel-kafka/src/main/docs/kafka.adoc | 10 ++++
 .../camel/component/kafka/KafkaComponent.java   | 31 +++++++++--
 .../component/kafka/KafkaComponentTest.java     | 55 +++++++++++++++++---
 4 files changed, 98 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c2516f7..01d2bff 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -26,6 +26,17 @@ From Camel 2.17 onwards Scala is no longer used, as we use 
the kafka java client
 [source,java]
 ---------------------------
 kafka:server:port[?options]
+
+OR
+
+kafka:server:port/topicName[?options]
+
+OR
+
+kafka:server/topicName[?options] 
+
+For the option above default port 9092 is used in the URI
+
 ---------------------------
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc 
b/components/camel-kafka/src/main/docs/kafka.adoc
index f81ccfd..73504c2 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -50,6 +50,16 @@ URI format
 [source,java]
 ---------------------------
 kafka:server:port[?options]
+
+OR
+
+kafka:server:port/topicName[?options]
+
+OR
+
+kafka:server/topicName[?options] 
+
+For the option above default port 9092 is used in the URI
 ---------------------------
 
  

http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 36baf3c..f2d22f2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.kafka;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -25,6 +27,13 @@ import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.Metadata;
 
 public class KafkaComponent extends UriEndpointComponent {
+    
+    // Topic name validation as per Kafka documentation [a-zA-Z0-9\\._\\-] as 
of 0.10
+    // hostname and port are extracted as per pattern. IP and hostname syntax 
is not validated using regex.
+    
+    static final Pattern SIMPLE_KAFKA_URI_PATTERN = 
Pattern.compile("([a-z0-9\\.]*)(:?)([0-9]*)/([a-zA-Z0-9\\._\\-]*)", 
Pattern.CASE_INSENSITIVE);
+    
+    static final String DEFAULT_PORT = "9092";
 
     @Metadata(label = "advanced")
     private ExecutorService workerPool;
@@ -39,10 +48,26 @@ public class KafkaComponent extends UriEndpointComponent {
 
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, 
Map<String, Object> params) throws Exception {
+        
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
-        String brokers = remaining.split("\\?")[0];
-        if (brokers != null) {
-            endpoint.getConfiguration().setBrokers(brokers);
+        
+        Matcher matcher = SIMPLE_KAFKA_URI_PATTERN.matcher(remaining);
+               
+        if (matcher.matches()) {
+            String hostName = matcher.group(1);          
+            String port = matcher.group(3);
+            String topic = matcher.group(4);
+            
+            if (port != null && port.length() == 0) {
+                port = DEFAULT_PORT;
+            }            
+            endpoint.getConfiguration().setBrokers(hostName + ":" + port);
+            endpoint.getConfiguration().setTopic(topic);
+        } else {
+            String brokers = remaining.split("\\?")[0];
+            if (brokers != null) {
+                endpoint.getConfiguration().setBrokers(brokers);
+            }            
         }
 
         // configure component options before endpoint properties which can 
override from params

http://git-wip-us.apache.org/repos/asf/camel/blob/ced96163/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index e6edd4b..e94ba24 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.kafka;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -28,7 +30,6 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.assertEquals;
 
 public class KafkaComponentTest {
 
@@ -36,7 +37,7 @@ public class KafkaComponentTest {
 
     @Test
     public void testPropertiesSet() throws Exception {
-        Map<String, Object> params = new HashMap<>();
+        Map<String, Object> params = new HashMap<String, Object>();
         params.put("topic", "mytopic");
         params.put("partitioner", "com.class.Party");
 
@@ -51,7 +52,7 @@ public class KafkaComponentTest {
 
     @Test
     public void testAllProducerConfigProperty() throws Exception {
-        Map<String, Object> params = new HashMap<>();
+        Map<String, Object> params = new HashMap<String, Object>();
         setProducerProperty(params);
 
         String uri = "kafka:dev1:12345,dev2:12566";
@@ -102,12 +103,11 @@ public class KafkaComponentTest {
         assertEquals("test", 
endpoint.getConfiguration().getSslEndpointAlgorithm());
         assertEquals("SunX509", 
endpoint.getConfiguration().getSslKeymanagerAlgorithm());
         assertEquals("PKIX", 
endpoint.getConfiguration().getSslTrustmanagerAlgorithm());
-        
assertEquals("org.apache.camel.component.kafka.MockProducerInterceptor", 
endpoint.getConfiguration().getInterceptorClasses());
     }
 
     @Test
     public void testAllProducerKeys() throws Exception {
-        Map<String, Object> params = new HashMap<>();
+        Map<String, Object> params = new HashMap<String, Object>();
 
         String uri = "kafka:dev1:12345,dev2:12566";
         String remaining = "dev1:12345,dev2:12566";
@@ -201,7 +201,50 @@ public class KafkaComponentTest {
         params.put("sslEndpointAlgorithm", "test");
         params.put("sslKeymanagerAlgorithm", "SunX509");
         params.put("sslTrustmanagerAlgorithm", "PKIX");
-        params.put("interceptorClasses", 
"org.apache.camel.component.kafka.MockProducerInterceptor");
     }
+    
+    // the URL format should include the topic name like the ActiiveMQ & AMQP 
endpoints
+    // kafka:serverName:port/topicName
+    // kafka:serverName/topicName
+    
+    @Test
+    public void testSimpleKakfaUriEndpoint() throws Exception {
+        
+        Map<String, Object> params = new HashMap<String, Object>();
+ 
+        String uri = "kafka:broker1:9999/topic2One.33";
+        String remaining = "broker1:9999/topic2One.33";
+        
+
+        KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
+        
+        assertEquals("topic2One.33", endpoint.getConfiguration().getTopic());
+        assertEquals("broker1:9999", endpoint.getConfiguration().getBrokers());
+        
+        // port not provided in the URI
+        
+        uri = "kafka:broker1/click-Topic";
+        remaining = "broker1/click-Topic";
+        
+        endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, 
params);
+        
+        assertEquals("click-Topic", endpoint.getConfiguration().getTopic());
+        assertEquals("broker1:9092", endpoint.getConfiguration().getBrokers());
+        
+        // IP Address provided instead of hostname
+        
+        uri = "kafka:10.10.10.3/click-Topic";
+        remaining = "10.10.10.3/click-Topic";
+        
+        endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, 
params);
+        
+        assertEquals("click-Topic", endpoint.getConfiguration().getTopic());
+        assertEquals("10.10.10.3:9092", 
endpoint.getConfiguration().getBrokers());
+        
+       
+        
+        
+        
+    }   
 
 }

Reply via email to