This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.23.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.23.x by this push: new 1e74c76 ENTESB-11304 - failIfNoConsumers option does not work with enabled block option 1e74c76 is described below commit 1e74c76af32c4d2132f85cc8d0c9230d73f9fd28 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Sep 2 07:06:46 2019 +0200 ENTESB-11304 - failIfNoConsumers option does not work with enabled block option --- .../directvm/DirectVmBlockingProducer.java | 24 ++++++++++++++-------- .../directvm/DirectVmProducerBlockingTest.java | 19 ++++++++++++++--- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java index 22d3f63..3f07cf9 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java @@ -26,14 +26,16 @@ import org.slf4j.LoggerFactory; /** * The direct producer. * <p/> - * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance - * of this class instead of {@code DirectProducer}. - * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout}, - * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException - * will be thrown. + * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the + * DirectEndpoint will create an instance of this class instead of + * {@code DirectProducer}. This producers {@code process} method will block for + * the configured duration ({@code DirectEndpoint#getTimeout}, default to 30 + * seconds). After which if a consumer is still unavailable a + * DirectConsumerNotAvailableException will be thrown. * <p/> - * Implementation note: Concurrent Producers will block for the duration it takes to determine if a - * consumer is available, but actual consumer execution will happen concurrently. + * Implementation note: Concurrent Producers will block for the duration it + * takes to determine if a consumer is available, but actual consumer execution + * will happen concurrently. */ public class DirectVmBlockingProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(DirectVmBlockingProducer.class); @@ -62,9 +64,13 @@ public class DirectVmBlockingProducer extends DefaultAsyncProducer { DirectVmConsumer answer = endpoint.getConsumer(); if (answer == null) { // okay then await until we have a consumer or we timed out - answer = awaitConsumer(); - if (answer == null) { + if (endpoint.isFailIfNoConsumers()) { throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } else { + answer = awaitConsumer(); + if (answer == null) { + throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } } } diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java index de37563..c18de85 100644 --- a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java @@ -35,7 +35,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport { StopWatch watch = new StopWatch(); try { - template.sendBody("direct-vm:suspended?block=true&timeout=500", "hello world"); + template.sendBody("direct-vm:suspended?block=true&timeout=500&failIfNoConsumers=false", "hello world"); fail("Expected CamelExecutionException"); } catch (CamelExecutionException e) { DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); @@ -51,7 +51,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport { StopWatch watch = new StopWatch(); try { - template.sendBody("direct-vm:start?block=true&timeout=500", "hello world"); + template.sendBody("direct-vm:start?block=true&timeout=500&failIfNoConsumers=false", "hello world"); fail("Expected CamelExecutionException"); } catch (CamelExecutionException e) { DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); @@ -60,6 +60,19 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport { assertTrue(watch.taken() > 490); } } + + public void testProducerBlocksFailIfNoConsumerFalse() throws Exception { + DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class); + endpoint.getConsumer().suspend(); + + try { + template.sendBody("direct-vm:start?block=true&timeout=500&failIfNoConsumers=true", "hello world"); + fail("Expected CamelExecutionException"); + } catch (CamelExecutionException e) { + DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, cause); + } + } @Test public void testProducerBlocksResumeTest() throws Exception { @@ -81,7 +94,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport { getMockEndpoint("mock:result").expectedMessageCount(1); - template.sendBody("direct-vm:suspended?block=true&timeout=1000", "hello world"); + template.sendBody("direct-vm:suspended?block=true&timeout=1000&failIfNoConsumers=false", "hello world"); assertMockEndpointsSatisfied();