Repository: camel Updated Branches: refs/heads/master 9013fe82e -> 4e43476ae
Allow component developers to set a timeout for default scheduled poll consumer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4e43476a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4e43476a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4e43476a Branch: refs/heads/master Commit: 4e43476ae014ea303405e6ae66120e60c8827331 Parents: 9013fe8 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Dec 10 15:43:19 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 10 15:46:01 2015 +0100 ---------------------------------------------------------------------- .../impl/DefaultScheduledPollConsumer.java | 29 +++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4e43476a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java index e827fdc..847574d 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java @@ -32,6 +32,7 @@ import org.apache.camel.util.ServiceHelper; */ public class DefaultScheduledPollConsumer extends ScheduledPollConsumer { private PollingConsumer pollingConsumer; + private int timeout; public DefaultScheduledPollConsumer(DefaultEndpoint defaultEndpoint, Processor processor) { super(defaultEndpoint, processor); @@ -45,7 +46,15 @@ public class DefaultScheduledPollConsumer extends ScheduledPollConsumer { int messagesPolled = 0; while (isPollAllowed()) { - Exchange exchange = pollingConsumer.receiveNoWait(); + Exchange exchange; + if (timeout == 0) { + exchange = pollingConsumer.receiveNoWait(); + } else if (timeout < 0) { + exchange = pollingConsumer.receive(); + } else { + exchange = pollingConsumer.receive(timeout); + } + if (exchange == null) { break; } @@ -67,6 +76,24 @@ public class DefaultScheduledPollConsumer extends ScheduledPollConsumer { return messagesPolled; } + public int getTimeout() { + return timeout; + } + + /** + * Sets a timeout to use with {@link PollingConsumer}. + * <br/> + * <br/>Use <tt>timeout < 0</tt> for {@link PollingConsumer#receive()}. + * <br/>Use <tt>timeout == 0</tt> for {@link PollingConsumer#receiveNoWait()}. + * <br/>Use <tt>timeout > 0</tt> for {@link PollingConsumer#receive(long)}}. + * <br/> The default timeout value is <tt>0</tt> + * + * @param timeout the timeout value + */ + public void setTimeout(int timeout) { + this.timeout = timeout; + } + @Override protected void doStart() throws Exception { pollingConsumer = getEndpoint().createPollingConsumer();