CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly. - Added new property to KafkaConfiguration for zookeeperConnection and configured it to override the zookeeperHost and zookeeperPort properties. - Created getZookeeperConnect method on KafkaConfiguration to return the zookeeperConnect property if set or the combination of host ":" port if zookeeperConnect is not set. - Added zookeeperConnect get and set methods on KafkaEndpoint to delegate to KafkaConfiguration. - Updated KafkaConsumer to use the getZookeeperConnect method on the KafkaEndpoint. - Added tests for the changes.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/daccc8e0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/daccc8e0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/daccc8e0 Branch: refs/remotes/origin/camel-2.13.x Commit: daccc8e063ec3b305ddb1ef20a9a57b982c45328 Parents: 3a5497a Author: john.shields <john.shie...@tubemogul.com> Authored: Thu Sep 25 20:57:23 2014 -0500 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Sep 27 13:23:14 2014 +0200 ---------------------------------------------------------------------- .../component/kafka/KafkaConfiguration.java | 25 +++++++++++++++++-- .../camel/component/kafka/KafkaConsumer.java | 9 +++---- .../camel/component/kafka/KafkaEndpoint.java | 9 +++++++ .../component/kafka/KafkaComponentTest.java | 26 ++++++++++++++++++++ .../component/kafka/KafkaConsumerTest.java | 14 ++++++++--- 5 files changed, 71 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 88d5017..881ef3c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -21,6 +21,7 @@ import java.util.Properties; import kafka.producer.DefaultPartitioner; public class KafkaConfiguration { + private String zookeeperConnect; private String zookeeperHost; private int zookeeperPort = 2181; private String topic; @@ -127,13 +128,31 @@ public class KafkaConfiguration { props.put(key, value.toString()); } } + + public String getZookeeperConnect() { + if (this.zookeeperConnect != null) { + return zookeeperConnect; + } else { + return getZookeeperHost() + ":" + getZookeeperPort(); + } + } + + public void setZookeeperConnect(String zookeeperConnect) { + this.zookeeperConnect = zookeeperConnect; + + // connect overrides host and port + this.zookeeperHost = null; + this.zookeeperPort = -1; + } public String getZookeeperHost() { return zookeeperHost; } public void setZookeeperHost(String zookeeperHost) { - this.zookeeperHost = zookeeperHost; + if (this.zookeeperConnect == null) { + this.zookeeperHost = zookeeperHost; + } } public int getZookeeperPort() { @@ -141,7 +160,9 @@ public class KafkaConfiguration { } public void setZookeeperPort(int zookeeperPort) { - this.zookeeperPort = zookeeperPort; + if (this.zookeeperConnect == null) { + this.zookeeperPort = zookeeperPort; + } } public String getGroupId() { http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 3087a14..f801328 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -46,11 +46,8 @@ public class KafkaConsumer extends DefaultConsumer { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; - if (endpoint.getZookeeperHost() == null) { - throw new IllegalArgumentException("zookeeper host must be specified"); - } - if (endpoint.getZookeeperPort() == 0) { - throw new IllegalArgumentException("zookeeper port must be specified"); + if (endpoint.getZookeeperConnect() == null) { + throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified"); } if (endpoint.getGroupId() == null) { throw new IllegalArgumentException("groupId must not be null"); @@ -59,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer { Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); - props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort()); + props.put("zookeeper.connect", endpoint.getZookeeperConnect()); props.put("group.id", endpoint.getGroupId()); return props; } http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 002d15e..deed68a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -102,6 +102,14 @@ public class KafkaEndpoint extends DefaultEndpoint { // Delegated properties from the configuration //------------------------------------------------------------------------- + public String getZookeeperConnect() { + return configuration.getZookeeperConnect(); + } + + public void setZookeeperConnect(String zookeeperConnect) { + configuration.setZookeeperConnect(zookeeperConnect); + } + public String getZookeeperHost() { return configuration.getZookeeperHost(); } @@ -417,4 +425,5 @@ public class KafkaEndpoint extends DefaultEndpoint { public int getRequestTimeoutMs() { return configuration.getRequestTimeoutMs(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index c11edaf..15cef7a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class KafkaComponentTest { @@ -43,6 +44,7 @@ public class KafkaComponentTest { String remaining = "broker1:12345,broker2:12566"; KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("somehost:2987", endpoint.getZookeeperConnect()); assertEquals("somehost", endpoint.getZookeeperHost()); assertEquals(2987, endpoint.getZookeeperPort()); assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); @@ -50,4 +52,28 @@ public class KafkaComponentTest { assertEquals(3, endpoint.getConsumerStreams()); assertEquals("com.class.Party", endpoint.getPartitioner()); } + + @Test + public void testZookeeperConnectPropertyOverride() throws Exception { + Map<String, Object> params = new HashMap<String, Object>(); + params.put("zookeeperConnect", "thehost:2181/chroot"); + params.put("zookeeperHost", "somehost"); + params.put("zookeeperPort", 2987); + params.put("portNumber", 14123); + params.put("consumerStreams", "3"); + params.put("topic", "mytopic"); + params.put("partitioner", "com.class.Party"); + + String uri = "kafka:broker1:12345,broker2:12566"; + String remaining = "broker1:12345,broker2:12566"; + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect()); + assertNull(endpoint.getZookeeperHost()); + assertEquals(-1, endpoint.getZookeeperPort()); + assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); + assertEquals("mytopic", endpoint.getTopic()); + assertEquals(3, endpoint.getConsumerStreams()); + assertEquals("com.class.Party", endpoint.getPartitioner()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 740f116..b51c09e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -28,15 +28,21 @@ public class KafkaConsumerTest { private Processor processor = mock(Processor.class); @Test(expected = IllegalArgumentException.class) - public void consumerRequiresZookeeperHost() throws Exception { - Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181); + public void consumerRequiresZookeeperConnect() throws Exception { + Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); new KafkaConsumer(endpoint, processor); } @Test(expected = IllegalArgumentException.class) - public void consumerRequiresZookeeperPort() throws Exception { - Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost"); + public void consumerRequiresGroupId() throws Exception { + Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); new KafkaConsumer(endpoint, processor); } + @Test + public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception { + Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); + Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); + new KafkaConsumer(endpoint, processor); + } }