Repository: camel Updated Branches: refs/heads/master aabb88c42 -> dd01ea747
CAMEL-7425: Add EventDrivenPollingConsumer option to specify timeout Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd01ea74 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd01ea74 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd01ea74 Branch: refs/heads/master Commit: dd01ea7474a591df100dcb94fabefd2c142192a8 Parents: aabb88c Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 12 14:54:20 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 12 14:54:20 2015 +0200 ---------------------------------------------------------------------- .../org/apache/camel/impl/DefaultEndpoint.java | 27 +++++++++++++++++++- .../camel/impl/EventDrivenPollingConsumer.java | 19 +++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java index f724e43..e4caf13 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java @@ -70,6 +70,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint private Map<String, Object> consumerProperties; private int pollingConsumerQueueSize = 1000; private boolean pollingConsumerBlockWhenFull = true; + private long pollingConsumerBlockTimeout; /** * Constructs a fully-initialized DefaultEndpoint instance. This is the @@ -222,9 +223,13 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint public PollingConsumer createPollingConsumer() throws Exception { // should not call configurePollingConsumer when its EventDrivenPollingConsumer - LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull()); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}", + new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()}); + } EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); + consumer.setBlockTimeout(getPollingConsumerBlockTimeout()); return consumer; } @@ -321,6 +326,26 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull; } + /** + * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. + * + * @see #setPollingConsumerBlockWhenFull(boolean) + */ + public long getPollingConsumerBlockTimeout() { + return pollingConsumerBlockTimeout; + } + + /** + * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. + * + * @see #setPollingConsumerBlockWhenFull(boolean) + */ + public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) { + this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout; + } + public void configureProperties(Map<String, Object> options) { Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer."); if (consumerProperties != null && !consumerProperties.isEmpty()) { http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index 764882e..624e9b2 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.IsSingleton; import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; @@ -47,6 +48,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement private ExceptionHandler interruptedExceptionHandler; private Consumer consumer; private boolean blockWhenFull = true; + private long blockTimeout; private final int queueCapacity; public EventDrivenPollingConsumer(Endpoint endpoint) { @@ -79,6 +81,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement this.blockWhenFull = blockWhenFull; } + public long getBlockTimeout() { + return blockTimeout; + } + + public void setBlockTimeout(long blockTimeout) { + this.blockTimeout = blockTimeout; + } + /** * Gets the queue capacity. */ @@ -139,7 +149,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement public void process(Exchange exchange) throws Exception { if (isBlockWhenFull()) { try { - queue.put(exchange); + if (getBlockTimeout() <= 0) { + queue.put(exchange); + } else { + boolean added = queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS); + if (!added) { + throw new ExchangeTimedOutException(exchange, getBlockTimeout()); + } + } } catch (InterruptedException e) { // ignore log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());