Repository: camel Updated Branches: refs/heads/master 3711172e4 -> 6c857fd5e
CAMEL-9511 - Setting Kafka's endpoint configuration by reference does not merge params Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c857fd5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c857fd5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c857fd5 Branch: refs/heads/master Commit: 6c857fd5e283c2a2f5a5a1ecd691b7a7b3aa7d07 Parents: 3711172 Author: Akitoshi Yoshida <a...@apache.org> Authored: Tue Feb 2 14:14:14 2016 +0100 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Tue Feb 2 18:18:41 2016 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaComponent.java | 16 +++++ .../component/kafka/KafkaConfiguration.java | 15 ++++- .../component/kafka/KafkaComponentTest.java | 62 ++++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6c857fd5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index b659e73..c9d4c2a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -20,6 +20,8 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.EndpointHelper; public class KafkaComponent extends UriEndpointComponent { @@ -38,6 +40,20 @@ public class KafkaComponent extends UriEndpointComponent { KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); String brokers = remaining.split("\\?")[0]; + Object confparam = params.get("configuration"); + if (confparam != null) { + // need a special handling to resolve the reference before other parameters are set/merged into the config + KafkaConfiguration confobj = null; + if (confparam instanceof KafkaConfiguration) { + confobj = (KafkaConfiguration)confparam; + } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) { + confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1)); + } + if (confobj != null) { + endpoint.setConfiguration(confobj.copy()); + } + params.remove("configuration"); + } if (brokers != null) { endpoint.getConfiguration().setBrokers(brokers); } http://git-wip-us.apache.org/repos/asf/camel/blob/6c857fd5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index b03970c..894df0c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -19,13 +19,14 @@ package org.apache.camel.component.kafka; import java.util.Properties; import kafka.producer.DefaultPartitioner; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; @UriParams -public class KafkaConfiguration { +public class KafkaConfiguration implements Cloneable { @UriParam private String zookeeperConnect; @@ -774,4 +775,16 @@ public class KafkaConfiguration { public void setDualCommitEnabled(Boolean dualCommitEnabled) { this.dualCommitEnabled = dualCommitEnabled; } + + /** + * Returns a copy of this configuration + */ + public KafkaConfiguration copy() { + try { + return (KafkaConfiguration)clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/6c857fd5/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 15cef7a..eb6dd09 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.camel.CamelContext; +import org.apache.camel.spi.Registry; import org.junit.Test; import org.mockito.Mockito; @@ -76,4 +77,65 @@ public class KafkaComponentTest { assertEquals(3, endpoint.getConsumerStreams()); assertEquals("com.class.Party", endpoint.getPartitioner()); } + + @Test + public void testPropertiesConfigrationMerge() throws Exception { + Map<String, Object> params = new HashMap<String, Object>(); + params.put("portNumber", 14123); + params.put("consumerStreams", "3"); + params.put("topic", "mytopic"); + params.put("partitioner", "com.class.Party"); + + KafkaConfiguration kc = new KafkaConfiguration(); + kc.setZookeeperHost("somehost"); + kc.setZookeeperPort(2987); + kc.setTopic("default"); + params.put("configuration", kc); + + String uri = "kafka:broker1:12345,broker2:12566"; + String remaining = "broker1:12345,broker2:12566"; + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("somehost:2987", endpoint.getZookeeperConnect()); + assertEquals("somehost", endpoint.getZookeeperHost()); + assertEquals(2987, endpoint.getZookeeperPort()); + assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); + assertEquals("mytopic", endpoint.getTopic()); + assertEquals(3, endpoint.getConsumerStreams()); + assertEquals("com.class.Party", endpoint.getPartitioner()); + assertNull("dirty", kc.getBrokers()); + assertEquals("default", kc.getTopic()); + } + + @Test + public void testPropertiesConfigrationRefMerge() throws Exception { + Map<String, Object> params = new HashMap<String, Object>(); + params.put("portNumber", 14123); + params.put("consumerStreams", "3"); + params.put("topic", "mytopic"); + params.put("partitioner", "com.class.Party"); + + KafkaConfiguration kc = new KafkaConfiguration(); + kc.setZookeeperHost("somehost"); + kc.setZookeeperPort(2987); + kc.setTopic("default"); + Registry registry = Mockito.mock(Registry.class); + Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc); + Mockito.when(context.getRegistry()).thenReturn(registry); + params.put("configuration", "#baseconf"); + + String uri = "kafka:broker1:12345,broker2:12566"; + String remaining = "broker1:12345,broker2:12566"; + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("somehost:2987", endpoint.getZookeeperConnect()); + assertEquals("somehost", endpoint.getZookeeperHost()); + assertEquals(2987, endpoint.getZookeeperPort()); + assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); + assertEquals("mytopic", endpoint.getTopic()); + assertEquals(3, endpoint.getConsumerStreams()); + assertEquals("com.class.Party", endpoint.getPartitioner()); + assertNull("dirty", kc.getBrokers()); + assertEquals("default", kc.getTopic()); + } }