CAMEL-7411: EventDrivenPollingConsumer can lose exchanges when the internal queue is full
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a67628e8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a67628e8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a67628e8 Branch: refs/heads/camel-2.12.x Commit: a67628e8bfdd2bae1fd5601e2a435f3192b9fb9f Parents: 2da223d Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 6 09:55:38 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 8 07:10:13 2014 +0200 ---------------------------------------------------------------------- .../org/apache/camel/impl/DefaultEndpoint.java | 62 +++++++- .../camel/impl/EventDrivenPollingConsumer.java | 51 ++++++- ...EventDrivenPollingConsumerQueueSizeTest.java | 142 +++++++++++++++++++ 3 files changed, 251 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a67628e8/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java index badd48e..2a6c5b0 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java @@ -36,6 +36,8 @@ import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A default endpoint useful for implementation inheritance. @@ -52,6 +54,7 @@ import org.apache.camel.util.URISupport; */ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware { + private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class); private String endpointUri; private EndpointConfiguration endpointConfiguration; private CamelContext camelContext; @@ -64,6 +67,8 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint private boolean synchronous; private final String id = EndpointHelper.createEndpointId(); private Map<String, Object> consumerProperties; + private int pollingConsumerQueueSize = 1000; + private boolean pollingConsumerBlockWhenFull = true; /** * Constructs a fully-initialized DefaultEndpoint instance. This is the @@ -215,8 +220,11 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint } public PollingConsumer createPollingConsumer() throws Exception { - // should not configure consumer - return new EventDrivenPollingConsumer(this); + // should not call configurePollingConsumer when its EventDrivenPollingConsumer + LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull()); + EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); + consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); + return consumer; } public Exchange createExchange(Exchange exchange) { @@ -267,6 +275,56 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint this.synchronous = synchronous; } + /** + * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and + * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. + * <p/> + * The default value is <tt>1000</tt> + */ + public int getPollingConsumerQueueSize() { + return pollingConsumerQueueSize; + } + + /** + * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and + * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. + * <p/> + * The default value is <tt>1000</tt> + */ + public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) { + this.pollingConsumerQueueSize = pollingConsumerQueueSize; + } + + /** + * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and + * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. + * <p/> + * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown + * when trying to add to the queue, and its full. + * <p/> + * The default value is <tt>true</tt> which will block the producer queue until the queue has space. + */ + public boolean isPollingConsumerBlockWhenFull() { + return pollingConsumerBlockWhenFull; + } + + /** + * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} + * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and + * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. + * <p/> + * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown + * when trying to add to the queue, and its full. + * <p/> + * The default value is <tt>true</tt> which will block the producer queue until the queue has space. + */ + public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) { + this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull; + } + public void configureProperties(Map<String, Object> options) { Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer."); if (consumerProperties != null && !consumerProperties.isEmpty()) { http://git-wip-us.apache.org/repos/asf/camel/blob/a67628e8/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index 07bcf07..428610e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.impl; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -43,17 +44,53 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement private final BlockingQueue<Exchange> queue; private ExceptionHandler interruptedExceptionHandler; private Consumer consumer; + private boolean blockWhenFull = true; + private final int queueCapacity; public EventDrivenPollingConsumer(Endpoint endpoint) { - this(endpoint, new ArrayBlockingQueue<Exchange>(1000)); + this(endpoint, 1000); + } + + public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) { + super(endpoint); + this.queueCapacity = queueSize; + if (queueSize <= 0) { + this.queue = new LinkedBlockingDeque<Exchange>(); + } else { + this.queue = new ArrayBlockingQueue<Exchange>(queueSize); + } + this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); } public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) { super(endpoint); this.queue = queue; + this.queueCapacity = queue.remainingCapacity(); this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); } + public boolean isBlockWhenFull() { + return blockWhenFull; + } + + public void setBlockWhenFull(boolean blockWhenFull) { + this.blockWhenFull = blockWhenFull; + } + + /** + * Gets the queue capacity. + */ + public int getQueueCapacity() { + return queueCapacity; + } + + /** + * Gets the current queue size (no of elements in the queue). + */ + public int getQueueSize() { + return queue.size(); + } + public Exchange receiveNoWait() { return receive(0); } @@ -98,7 +135,16 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement } public void process(Exchange exchange) throws Exception { - queue.offer(exchange); + if (isBlockWhenFull()) { + try { + queue.put(exchange); + } catch (InterruptedException e) { + // ignore + log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); + } + } else { + queue.add(exchange); + } } public ExceptionHandler getInterruptedExceptionHandler() { @@ -155,5 +201,6 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownService(consumer); + queue.clear(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/a67628e8/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java new file mode 100644 index 0000000..06649a2 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Map; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.util.ServiceHelper; + +public class EventDrivenPollingConsumerQueueSizeTest extends ContextTestSupport { + + private String uri = "my:foo?pollingConsumerQueueSize=10&pollingConsumerBlockWhenFull=false"; + + @Override + protected void setUp() throws Exception { + super.setUp(); + context.addComponent("my", new MyQueueComponent()); + } + + public void testQueueSize() throws Exception { + PollingConsumer consumer = context.getEndpoint(uri).createPollingConsumer(); + consumer.start(); + + assertNotNull(consumer); + EventDrivenPollingConsumer edpc = assertIsInstanceOf(EventDrivenPollingConsumer.class, consumer); + assertEquals(0, edpc.getQueueSize()); + assertEquals(10, edpc.getQueueCapacity()); + assertFalse(edpc.isBlockWhenFull()); + + for (int i = 0; i < 10; i++) { + template.sendBody(uri, "Message " + i); + } + + assertEquals(10, edpc.getQueueSize()); + + try { + template.sendBody(uri, "Message 10"); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + // queue should be full + assertIsInstanceOf(IllegalStateException.class, e.getCause()); + } + + Exchange out = consumer.receive(5000); + assertNotNull(out); + assertEquals("Message 0", out.getIn().getBody()); + + assertEquals(9, edpc.getQueueSize()); + assertEquals(10, edpc.getQueueCapacity()); + + // now there is room + template.sendBody(uri, "Message 10"); + + assertEquals(10, edpc.getQueueSize()); + assertEquals(10, edpc.getQueueCapacity()); + + ServiceHelper.stopService(consumer); + // not cleared if we stop + assertEquals(10, edpc.getQueueSize()); + assertEquals(10, edpc.getQueueCapacity()); + + ServiceHelper.stopAndShutdownService(consumer); + // now its cleared as we shutdown + assertEquals(0, edpc.getQueueSize()); + assertEquals(10, edpc.getQueueCapacity()); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private final class MyQueueComponent extends DefaultComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new MyQueueEndpoint(uri, this); + } + } + + private final class MyQueueEndpoint extends DefaultEndpoint { + + private EventDrivenPollingConsumer consumer; + + private MyQueueEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + @Override + public Producer createProducer() throws Exception { + return new DefaultProducer(this) { + @Override + public void process(Exchange exchange) throws Exception { + consumer.process(exchange); + } + }; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return consumer; + } + + @Override + public PollingConsumer createPollingConsumer() throws Exception { + return consumer; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + protected void doStart() throws Exception { + consumer = (EventDrivenPollingConsumer) super.createPollingConsumer(); + super.doStart(); + } + } +}