CAMEL-9700: seda - discardIfNoConsumers=true do not call on completions
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d1cf9fa Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d1cf9fa Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d1cf9fa Branch: refs/heads/camel-2.16.x Commit: 5d1cf9fa0ce7094b4b55191a7b75e68283099c3d Parents: bd4008e Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Mar 11 14:11:56 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Mar 11 14:12:45 2016 +0100 ---------------------------------------------------------------------- .../camel/component/seda/SedaProducer.java | 23 +++++++---- .../seda/SedaDiscardIfNoConsumerTest.java | 40 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5d1cf9fa/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java index a87ddf3..1e28eaa 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -122,7 +122,8 @@ public class SedaProducer extends DefaultAsyncProducer { log.trace("Adding Exchange to queue: {}", copy); try { - addToQueue(copy); + // do not copy as we already did the copy + addToQueue(copy, false); } catch (SedaConsumerNotAvailableException e) { exchange.setException(e); callback.done(true); @@ -160,11 +161,8 @@ public class SedaProducer extends DefaultAsyncProducer { } } else { // no wait, eg its a InOnly then just add to queue and return - // handover the completion so its the copy which performs that, as we do not wait - Exchange copy = prepareCopy(exchange, true); - log.trace("Adding Exchange to queue: {}", copy); try { - addToQueue(copy); + addToQueue(exchange, true); } catch (SedaConsumerNotAvailableException e) { exchange.setException(e); callback.done(true); @@ -205,8 +203,9 @@ public class SedaProducer extends DefaultAsyncProducer { * simply add which will throw exception if the queue is full * * @param exchange the exchange to add to the queue + * @param copy whether to create a copy of the exchange to use for adding to the queue */ - protected void addToQueue(Exchange exchange) throws SedaConsumerNotAvailableException { + protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException { BlockingQueue<Exchange> queue = null; QueueReference queueReference = endpoint.getQueueReference(); if (queueReference != null) { @@ -226,15 +225,23 @@ public class SedaProducer extends DefaultAsyncProducer { } } + Exchange target = exchange; + + // handover the completion so its the copy which performs that, as we do not wait + if (copy) { + target = prepareCopy(exchange, true); + } + + log.trace("Adding Exchange to queue: {}", target); if (blockWhenFull) { try { - queue.put(exchange); + queue.put(target); } catch (InterruptedException e) { // ignore log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); } } else { - queue.add(exchange); + queue.add(target); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5d1cf9fa/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java index 630abd4..2953b6f 100644 --- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java @@ -17,7 +17,10 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.SynchronizationAdapter; /** * @version @@ -37,6 +40,29 @@ public class SedaDiscardIfNoConsumerTest extends ContextTestSupport { assertEquals(0, bar.getCurrentQueueSize()); } + public void testDiscardUoW() throws Exception { + SedaEndpoint bar = getMandatoryEndpoint("seda:bar", SedaEndpoint.class); + assertEquals(0, bar.getCurrentQueueSize()); + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + final MyCompletion myCompletion = new MyCompletion(); + + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + exchange.addOnCompletion(myCompletion); + } + }); + + assertMockEndpointsSatisfied(); + + assertEquals(0, bar.getCurrentQueueSize()); + + assertEquals(true, myCompletion.isCalled()); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -46,4 +72,18 @@ public class SedaDiscardIfNoConsumerTest extends ContextTestSupport { } }; } + + private static final class MyCompletion extends SynchronizationAdapter { + + private boolean called; + + @Override + public void onDone(Exchange exchange) { + called = true; + } + + public boolean isCalled() { + return called; + } + } }