Repository: camel Updated Branches: refs/heads/master 0f636566e -> 19e70a6a3
CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/19e70a6a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/19e70a6a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/19e70a6a Branch: refs/heads/master Commit: 19e70a6a36b75a1e2c3291d9d21df24e263023fa Parents: 0f63656 Author: Willem Jiang <[email protected]> Authored: Wed Jul 1 23:03:12 2015 +0800 Committer: Willem Jiang <[email protected]> Committed: Wed Jul 1 23:03:31 2015 +0800 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConfiguration.java | 1 - .../apache/camel/component/kafka/KafkaEndpoint.java | 14 +++++++++++--- .../apache/camel/component/kafka/KafkaProducer.java | 5 ++++- .../camel/component/kafka/KafkaProducerTest.java | 13 +++++++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 5bd8ee6..4858af9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -39,7 +39,6 @@ public class KafkaConfiguration { private String groupId; @UriParam(defaultValue = "DefaultPartitioner") private String partitioner = DefaultPartitioner.class.getCanonicalName(); - @UriParam(label = "consumer", defaultValue = "10") private int consumerStreams = 10; @UriParam(label = "consumer", defaultValue = "1") http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index df213f3..bbf7062 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.kafka; -import java.net.URISyntaxException; import java.util.concurrent.ExecutorService; import kafka.message.MessageAndMetadata; @@ -29,16 +28,17 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.DefaultMessage; -import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass = KafkaConsumer.class, label = "messaging") public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { @UriParam private KafkaConfiguration configuration = new KafkaConfiguration(); + + @UriParam(defaultValue = "false") + private boolean bridgeEndpoint; public KafkaEndpoint() { } @@ -458,4 +458,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS public boolean isMultipleConsumersSupported() { return true; } + + public boolean isBridgeEndpoint() { + return bridgeEndpoint; + } + + public void setBridgeEndpoint(boolean bridgeEndpoint) { + this.bridgeEndpoint = bridgeEndpoint; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index c598cf9..c637652 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -61,7 +61,10 @@ public class KafkaProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws CamelException { - String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class); + String topic = endpoint.getTopic(); + if (!endpoint.isBridgeEndpoint()) { + topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class); + } if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 8e5b0f7..f2bcd6b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -128,6 +128,19 @@ public class KafkaProducerTest { verifySendMessage("someKey", "someTopic", "someKey"); } + + @Test + public void processSendMessageWithBridgeEndpoint() throws Exception { + endpoint.setTopic("someTopic"); + endpoint.setBridgeEndpoint(true); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + in.setHeader(KafkaConstants.KEY, "someKey"); + + producer.process(exchange); + + verifySendMessage("someKey", "someTopic", "someKey"); + } @SuppressWarnings({"unchecked", "rawtypes"}) protected void verifySendMessage(String partitionKey, String topic, String messageKey) {
