Repository: camel Updated Branches: refs/heads/master 431ee2bdb -> 7ca0082d5
Fixed CS Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7ca0082d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7ca0082d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7ca0082d Branch: refs/heads/master Commit: 7ca0082d5b092e5a9e49a7b5bb36361973c5f726 Parents: af737a1 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sun Apr 3 12:19:34 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sun Apr 3 12:19:54 2016 +0200 ---------------------------------------------------------------------- .../camel/component/nats/NatsConsumer.java | 40 ++++++++++---------- .../camel/component/nats/NatsProducer.java | 6 +-- .../component/nats/NatsConsumerLoadTest.java | 9 ++--- .../nats/NatsConsumerMaxMessagesTest.java | 3 +- 4 files changed, 28 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 8be0aea..31441e6 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -21,6 +21,12 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; +import io.nats.client.Message; +import io.nats.client.MessageHandler; +import io.nats.client.Subscription; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; @@ -28,12 +34,6 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; -import io.nats.client.Message; -import io.nats.client.MessageHandler; -import io.nats.client.Subscription; - public class NatsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class); @@ -114,11 +114,10 @@ public class NatsConsumer extends DefaultConsumer { @Override public void run() { try { - if (ObjectHelper.isNotEmpty(configuration.getQueueName())) { + if (ObjectHelper.isNotEmpty(configuration.getQueueName())) { sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() { - - @Override - public void onMessage(Message msg) { + @Override + public void onMessage(Message msg) { LOG.debug("Received Message: {}", msg); Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(msg); @@ -130,15 +129,14 @@ public class NatsConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error during processing", exchange, e); } } - }); + }); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { - sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } - } else { + } else { sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() { - - @Override - public void onMessage(Message msg) { + @Override + public void onMessage(Message msg) { LOG.debug("Received Message: {}", msg); Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(msg); @@ -150,14 +148,14 @@ public class NatsConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error during processing", exchange, e); } } - }); + }); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { - sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } - } - } catch (Throwable e) { - getExceptionHandler().handleException("Error during processing", e); } + } catch (Throwable e) { + getExceptionHandler().handleException("Error during processing", e); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index 2e92f44..318b4e0 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -20,14 +20,14 @@ import java.io.IOException; import java.util.Properties; import java.util.concurrent.TimeoutException; +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; + import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; - public class NatsProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class); http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java index 87d0c3e..d1a0350 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java @@ -17,9 +17,11 @@ package org.apache.camel.component.nats; import java.io.IOException; -import java.util.Properties; import java.util.concurrent.TimeoutException; +import io.nats.client.Connection; +import io.nats.client.ConnectionFactory; + import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -27,9 +29,6 @@ import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Ignore; import org.junit.Test; -import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; - @Ignore("Require a running Nats server") public class NatsConsumerLoadTest extends CamelTestSupport { @@ -54,7 +53,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:send").to("nats://localhost:4222?topic=test"); + from("direct:send").to("nats://localhost:4222?topic=test"); from("nats://localhost:4222?topic=test").to(mockResultEndpoint); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java index 5ee94d9..7f4d434 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java @@ -33,7 +33,8 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport { @Test public void testMaxConsumer() throws InterruptedException, IOException { - mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}"); + mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", + "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}"); mockResultEndpoint.setExpectedMessageCount(5); template.sendBody("direct:send", "test"); template.sendBody("direct:send", "test1");