Repository: camel Updated Branches: refs/heads/master e904bf498 -> 5582508a7
CAMEL-9962: Add a field in the consumer to define if it is subscribed to the topic or not Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5582508a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5582508a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5582508a Branch: refs/heads/master Commit: 5582508a74fb67f806c5a5dcf58705e99295a62a Parents: e904bf4 Author: Andrea Cosentino <anco...@gmail.com> Authored: Thu May 12 13:54:36 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu May 12 13:55:34 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/nats/NatsConsumer.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5582508a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index d0abb36..8fc2eff 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -42,6 +42,7 @@ public class NatsConsumer extends DefaultConsumer { private ExecutorService executor; private Connection connection; private Subscription sid; + private boolean subscribed; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -101,6 +102,14 @@ public class NatsConsumer extends DefaultConsumer { return connection; } + public boolean isSubscribed() { + return subscribed; + } + + public void setSubscribed(boolean subscribed) { + this.subscribed = subscribed; + } + class NatsConsumingTask implements Runnable { private final Connection connection; @@ -133,6 +142,9 @@ public class NatsConsumer extends DefaultConsumer { if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } + if (sid.isValid()) { + setSubscribed(true); + } } else { sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() { @Override @@ -151,7 +163,10 @@ public class NatsConsumer extends DefaultConsumer { }); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); - } + } + if (sid.isValid()) { + setSubscribed(true); + } } } catch (Throwable e) { getExceptionHandler().handleException("Error during processing", e);