Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x bd4008ea2 -> 5d1cf9fa0
  refs/heads/master 8f89b49dc -> 4d03e9de6


CAMEL-9700: seda - discardIfNoConsumers=true do not call on completions


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d03e9de
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d03e9de
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d03e9de

Branch: refs/heads/master
Commit: 4d03e9de6bffe81a2902128064a8fad23a7f75f5
Parents: 8f89b49
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Mar 11 14:11:56 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Mar 11 14:11:56 2016 +0100

----------------------------------------------------------------------
 .../camel/component/seda/SedaProducer.java      | 23 +++++++----
 .../seda/SedaDiscardIfNoConsumerTest.java       | 40 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4d03e9de/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index a87ddf3..1e28eaa 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -122,7 +122,8 @@ public class SedaProducer extends DefaultAsyncProducer {
 
             log.trace("Adding Exchange to queue: {}", copy);
             try {
-                addToQueue(copy);
+                // do not copy as we already did the copy
+                addToQueue(copy, false);
             } catch (SedaConsumerNotAvailableException e) {
                 exchange.setException(e);
                 callback.done(true);
@@ -160,11 +161,8 @@ public class SedaProducer extends DefaultAsyncProducer {
             }
         } else {
             // no wait, eg its a InOnly then just add to queue and return
-            // handover the completion so its the copy which performs that, as 
we do not wait
-            Exchange copy = prepareCopy(exchange, true);
-            log.trace("Adding Exchange to queue: {}", copy);
             try {
-                addToQueue(copy);
+                addToQueue(exchange, true);
             } catch (SedaConsumerNotAvailableException e) {
                 exchange.setException(e);
                 callback.done(true);
@@ -205,8 +203,9 @@ public class SedaProducer extends DefaultAsyncProducer {
      * simply add which will throw exception if the queue is full
      * 
      * @param exchange the exchange to add to the queue
+     * @param copy     whether to create a copy of the exchange to use for 
adding to the queue
      */
-    protected void addToQueue(Exchange exchange) throws 
SedaConsumerNotAvailableException {
+    protected void addToQueue(Exchange exchange, boolean copy) throws 
SedaConsumerNotAvailableException {
         BlockingQueue<Exchange> queue = null;
         QueueReference queueReference = endpoint.getQueueReference();
         if (queueReference != null) {
@@ -226,15 +225,23 @@ public class SedaProducer extends DefaultAsyncProducer {
             }
         }
 
+        Exchange target = exchange;
+
+        // handover the completion so its the copy which performs that, as we 
do not wait
+        if (copy) {
+            target = prepareCopy(exchange, true);
+        }
+
+        log.trace("Adding Exchange to queue: {}", target);
         if (blockWhenFull) {
             try {
-                queue.put(exchange);
+                queue.put(target);
             } catch (InterruptedException e) {
                 // ignore
                 log.debug("Put interrupted, are we stopping? {}", isStopping() 
|| isStopped());
             }
         } else {
-            queue.add(exchange);
+            queue.add(target);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4d03e9de/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
index 630abd4..2953b6f 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
@@ -17,7 +17,10 @@
 package org.apache.camel.component.seda;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
 
 /**
  * @version 
@@ -37,6 +40,29 @@ public class SedaDiscardIfNoConsumerTest extends 
ContextTestSupport {
         assertEquals(0, bar.getCurrentQueueSize());
     }
 
+    public void testDiscardUoW() throws Exception {
+        SedaEndpoint bar = getMandatoryEndpoint("seda:bar", 
SedaEndpoint.class);
+        assertEquals(0, bar.getCurrentQueueSize());
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        final MyCompletion myCompletion = new MyCompletion();
+
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.addOnCompletion(myCompletion);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(0, bar.getCurrentQueueSize());
+
+        assertEquals(true, myCompletion.isCalled());
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -46,4 +72,18 @@ public class SedaDiscardIfNoConsumerTest extends 
ContextTestSupport {
             }
         };
     }
+
+    private static final class MyCompletion extends SynchronizationAdapter {
+
+        private boolean called;
+
+        @Override
+        public void onDone(Exchange exchange) {
+            called = true;
+        }
+
+        public boolean isCalled() {
+            return called;
+        }
+    }
 }

Reply via email to