CAMEL-6545 seda producer - Add option to fail for non existing queue with 
thanks to Christian Posta


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/783a2f52
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/783a2f52
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/783a2f52

Branch: refs/heads/master
Commit: 783a2f52d18074389508c0525c49acf303ca18ae
Parents: fd3794d
Author: Willem Jiang <ningji...@apache.org>
Authored: Thu Aug 22 08:48:43 2013 +0800
Committer: Willem Jiang <ningji...@apache.org>
Committed: Thu Aug 22 08:48:43 2013 +0800

----------------------------------------------------------------------
 .../camel/component/seda/QueueReference.java    |  93 ++++++++++++
 .../camel/component/seda/SedaComponent.java     |  89 ++++--------
 .../seda/SedaConsumerNotAvailableException.java |  31 ++++
 .../camel/component/seda/SedaEndpoint.java      |  35 ++++-
 .../camel/component/seda/SedaProducer.java      |  30 +++-
 .../apache/camel/component/vm/VmComponent.java  |   1 +
 .../SedaComponentReferenceEndpointTest.java     |   2 +-
 .../component/seda/SedaNoConsumerTest.java      | 140 ++++++++++++++++++-
 .../vm/VmComponentReferenceEndpointTest.java    |   4 +-
 9 files changed, 346 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java
new file mode 100644
index 0000000..5b7fd50
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Holder for queue references.
+ * <p/>
+ * This is used to keep track of the usages of the queues, so we know when a 
queue is no longer
+ * in use, and can safely be discarded.
+ */
+public final class QueueReference {
+
+    private final BlockingQueue<Exchange> queue;
+    private Integer size;
+    private Boolean multipleConsumers;
+
+    private List<SedaEndpoint> endpoints = new LinkedList<SedaEndpoint>();
+
+    QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean 
multipleConsumers) {
+        this.queue = queue;
+        this.size = size;
+        this.multipleConsumers = multipleConsumers;
+    }
+
+    synchronized void addReference(SedaEndpoint endpoint) {
+        if (!endpoints.contains(endpoint)) {
+            endpoints.add(endpoint);
+        }
+    }
+
+    synchronized void removeReference(SedaEndpoint endpoint) {
+        if (endpoints.contains(endpoint)) {
+            endpoints.remove(endpoint);
+        }
+    }
+
+    /**
+     * Gets the reference counter
+     */
+    public synchronized int getCount() {
+        return endpoints.size();
+    }
+
+    /**
+     * Gets the queue size
+     *
+     * @return <tt>null</tt> if unbounded
+     */
+    public Integer getSize() {
+        return size;
+    }
+
+    public Boolean getMultipleConsumers() {
+        return multipleConsumers;
+    }
+
+    /**
+     * Gets the queue
+     */
+    public BlockingQueue<Exchange> getQueue() {
+        return queue;
+    }
+
+    public synchronized boolean hasConsumers() {
+        for (SedaEndpoint endpoint : endpoints) {
+            if (endpoint.getConsumers().size() > 0) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index 6b8d81a..64415b3 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -68,22 +68,22 @@ public class SedaComponent extends UriEndpointComponent {
     }
 
     /**
-     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, 
BlockingQueueFactory)}
+     * @deprecated use
      */
     @Deprecated
-    public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size) {
-        return getOrCreateQueue(uri, size, null);
+    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, 
Integer size) {
+        return getOrCreateQueue(endpoint, size, null);
     }
 
     /**
-     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, 
BlockingQueueFactory)}
+     * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, 
Boolean, BlockingQueueFactory)}
      */
-    public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size, Boolean multipleConsumers) {
-        return getOrCreateQueue(uri, size, multipleConsumers, null);
+    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, 
Integer size, Boolean multipleConsumers) {
+        return getOrCreateQueue(endpoint, size, multipleConsumers, null);
     }
 
-    public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) {
-        String key = getQueueKey(uri);
+    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, 
Integer size, Boolean multipleConsumers, BlockingQueueFactory 
customQueueFactory) {
+        String key = getQueueKey(endpoint.getEndpointUri());
 
         QueueReference ref = getQueues().get(key);
         if (ref != null) {
@@ -95,7 +95,7 @@ public class SedaComponent extends UriEndpointComponent {
                         + (ref.getSize() != null ? ref.getSize() : 
Integer.MAX_VALUE) + " does not match given queue size " + size);
             }
             // add the reference before returning queue
-            ref.addReference();
+            ref.addReference(endpoint);
 
             if (log.isDebugEnabled()) {
                 log.debug("Reusing existing queue {} with size {} and 
reference count {}", new Object[]{key, size, ref.getCount()});
@@ -120,12 +120,25 @@ public class SedaComponent extends UriEndpointComponent {
 
         // create and add a new reference queue
         ref = new QueueReference(queue, size, multipleConsumers);
-        ref.addReference();
+        ref.addReference(endpoint);
         getQueues().put(key, ref);
 
         return ref;
     }
 
+    public synchronized QueueReference registerQueue(SedaEndpoint endpoint, 
BlockingQueue queue) {
+        String key = getQueueKey(endpoint.getEndpointUri());
+
+        QueueReference ref = getQueues().get(key);
+        if (ref == null) {
+            ref = new QueueReference(queue, endpoint.getSize(), 
endpoint.isMultipleConsumers());
+            ref.addReference(endpoint);
+            getQueues().put(key, ref);
+        }
+
+        return ref;
+    }
+
     public Map<String, QueueReference> getQueues() {
         return queues;
     }
@@ -186,7 +199,7 @@ public class SedaComponent extends UriEndpointComponent {
         String key = getQueueKey(endpoint.getEndpointUri());
         QueueReference ref = getQueues().get(key);
         if (ref != null) {
-            ref.removeReference();
+            ref.removeReference(endpoint);
             if (ref.getCount() <= 0) {
                 // reference no longer needed so remove from queues
                 getQueues().remove(key);
@@ -194,58 +207,4 @@ public class SedaComponent extends UriEndpointComponent {
         }
     }
 
-    /**
-     * Holder for queue references.
-     * <p/>
-     * This is used to keep track of the usages of the queues, so we know when 
a queue is no longer
-     * in use, and can safely be discarded.
-     */
-    public static final class QueueReference {
-        
-        private final BlockingQueue<Exchange> queue;
-        private volatile int count;
-        private Integer size;
-        private Boolean multipleConsumers;
-
-        private QueueReference(BlockingQueue<Exchange> queue, Integer size, 
Boolean multipleConsumers) {
-            this.queue = queue;
-            this.size = size;
-            this.multipleConsumers = multipleConsumers;
-        }
-        
-        void addReference() {
-            count++;
-        }
-        
-        void removeReference() {
-            count--;
-        }
-
-        /**
-         * Gets the reference counter
-         */
-        public int getCount() {
-            return count;
-        }
-
-        /**
-         * Gets the queue size
-         *
-         * @return <tt>null</tt> if unbounded
-         */
-        public Integer getSize() {
-            return size;
-        }
-
-        public Boolean getMultipleConsumers() {
-            return multipleConsumers;
-        }
-
-        /**
-         * Gets the queue
-         */
-        public BlockingQueue<Exchange> getQueue() {
-            return queue;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java
 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java
new file mode 100644
index 0000000..d49e0b6
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog";>Christian Posta</a>
+ */
+public class SedaConsumerNotAvailableException extends CamelExchangeException {
+    private static final long serialVersionUID = 683242306650809007L;
+
+    public SedaConsumerNotAvailableException(String message, Exchange 
exchange) {
+        super(message, exchange);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 2b81768..5fe1b8c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -78,6 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
     private int pollTimeout = 1000;
     @UriParam
     private boolean purgeWhenStopping;
+
+    @UriParam
+    private boolean failIfNoConsumers;
+
     private BlockingQueueFactory<Exchange> queueFactory;
 
     public SedaEndpoint() {
@@ -95,6 +99,7 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
             this.size = queue.remainingCapacity();
         }
         queueFactory = new LinkedBlockingQueueFactory<Exchange>();
+        getComponent().registerQueue(this, queue);
     }
 
     public SedaEndpoint(String endpointUri, Component component, 
BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
@@ -120,7 +125,7 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
         if (getComponent() != null) {
             // all consumers must match having the same multipleConsumers 
options
             String key = getComponent().getQueueKey(getEndpointUri());
-            SedaComponent.QueueReference ref = 
getComponent().getQueueReference(key);
+            QueueReference ref = getComponent().getQueueReference(key);
             if (ref != null && ref.getMultipleConsumers() != 
isMultipleConsumers()) {
                 // there is already a multiple consumers, so make sure they 
matches
                 throw new IllegalArgumentException("Cannot use existing queue 
" + key + " as the existing queue multiple consumers "
@@ -141,7 +146,7 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
             if (getComponent() != null) {
                 // use null to indicate default size (= use what the existing 
queue has been configured with)
                 Integer size = getSize() == Integer.MAX_VALUE ? null : 
getSize();
-                SedaComponent.QueueReference ref = 
getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), 
queueFactory);
+                QueueReference ref = getComponent().getOrCreateQueue(this, 
size, isMultipleConsumers(), queueFactory);
                 queue = ref.getQueue();
                 String key = getComponent().getQueueKey(getEndpointUri());
                 LOG.info("Endpoint {} is using shared queue: {} with size: 
{}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : 
Integer.MAX_VALUE});
@@ -166,6 +171,16 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
         }
     }
 
+    public synchronized QueueReference getQueueReference() {
+        String key = getComponent().getQueueKey(getEndpointUri());
+        QueueReference ref =  getComponent().getQueueReference(key);
+        if (ref == null) {
+            LOG.warn("There was no queue reference for this queue!");
+        }
+
+        return ref;
+    }
+
     protected synchronized MulticastProcessor getConsumerMulticastProcessor() 
throws Exception {
         if (!multicastStarted && consumerMulticastProcessor != null) {
             // only start it on-demand to avoid starting it during stopping
@@ -259,6 +274,15 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
     }
 
     @ManagedAttribute
+    public boolean isFailIfNoConsumers() {
+        return failIfNoConsumers;
+    }
+
+    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
+        this.failIfNoConsumers = failIfNoConsumers;
+    }
+
+    @ManagedAttribute
     public boolean isMultipleConsumers() {
         return multipleConsumers;
     }
@@ -468,4 +492,11 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
         // clear queue, as we are shutdown, so if re-created then the queue 
must be updated
         queue = null;
     }
+
+    public boolean hasConsumers() {
+        if (this.consumers == null) {
+            return false;
+        }
+        return this.consumers.size() > 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index 3614460..ada3cde 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -27,11 +27,15 @@ import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @version 
  */
 public class SedaProducer extends DefaultAsyncProducer {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(SedaProducer.class);
+
     /**
      * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API 
which delivers the accurate reference to the queue currently being used.
      */
@@ -120,7 +124,13 @@ public class SedaProducer extends DefaultAsyncProducer {
             });
 
             log.trace("Adding Exchange to queue: {}", copy);
-            addToQueue(copy);
+            try {
+                addToQueue(copy);
+            } catch (SedaConsumerNotAvailableException e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
 
             if (timeout > 0) {
                 if (log.isTraceEnabled()) {
@@ -156,7 +166,13 @@ public class SedaProducer extends DefaultAsyncProducer {
             // handover the completion so its the copy which performs that, as 
we do not wait
             Exchange copy = prepareCopy(exchange, true);
             log.trace("Adding Exchange to queue: {}", copy);
-            addToQueue(copy);
+            try {
+                addToQueue(copy);
+            } catch (SedaConsumerNotAvailableException e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
         }
 
         // we use OnCompletion on the Exchange to callback and wait for the 
Exchange to be done
@@ -193,8 +209,14 @@ public class SedaProducer extends DefaultAsyncProducer {
      * 
      * @param exchange the exchange to add to the queue
      */
-    protected void addToQueue(Exchange exchange) {
-        BlockingQueue<Exchange> queue = endpoint.getQueue();
+    protected void addToQueue(Exchange exchange) throws 
SedaConsumerNotAvailableException {
+        QueueReference queueReference = endpoint.getQueueReference();
+        BlockingQueue<Exchange> queue = queueReference.getQueue();
+
+        if (endpoint.isFailIfNoConsumers() && !queueReference.hasConsumers()) {
+            LOG.warn("No consumers available on endpoint: " + endpoint + " to 
process: " + exchange);
+            throw new SedaConsumerNotAvailableException("No consumers 
available on endpoint: " + endpoint, exchange);
+        }
         if (blockWhenFull) {
             try {
                 queue.put(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java 
b/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
index 32e72d8..392d3c1 100644
--- a/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.component.seda.QueueReference;
 import org.apache.camel.component.seda.SedaComponent;
 
 /**

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
index 3a0b19b..9ea2fd0 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java
@@ -78,7 +78,7 @@ public class SedaComponentReferenceEndpointTest extends 
ContextTestSupport {
     
     private int numberOfReferences(SedaComponent seda) {
         int num = 0;
-        Iterator<SedaComponent.QueueReference> it = 
seda.getQueues().values().iterator();
+        Iterator<QueueReference> it = seda.getQueues().values().iterator();
         while (it.hasNext()) {
             num += it.next().getCount();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
index 6e36612..8a89baa 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
@@ -16,22 +16,53 @@
  */
 package org.apache.camel.component.seda;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
 
 /**
  * @version 
  */
 public class SedaNoConsumerTest extends ContextTestSupport {
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+
     public void testInOnly() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("seda:foo?timeout=1000");
+            }
+        });
+
+        context.start();
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
         // no problem for in only as we do not expect a reply
         template.sendBody("direct:start", "Hello World");
+        notify.matches(2, TimeUnit.SECONDS);
+
     }
 
     public void testInOut() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("seda:foo?timeout=1000");
+            }
+        });
+
+        context.start();
+
         try {
             template.requestBody("direct:start", "Hello World");
             fail("Should throw an exception");
@@ -40,13 +71,112 @@ public class SedaNoConsumerTest extends ContextTestSupport 
{
         }
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
+
+    @Test
+    public void testFailIfNoConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("seda:foo?timeout=1000");
+                from("direct:start").to("seda:foo?failIfNoConsumers=true");
             }
-        };
+        });
+
+        context.start();
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(SedaConsumerNotAvailableException.class, 
e.getCause());
+        }
+
+
     }
+
+    @Test
+    public void testFailIfNoConsuemerAndMultipleConsumerSetting() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("seda:foo?failIfNoConsumers=true&multipleConsumers=true").to("mock:foo");
+                
from("seda:foo?failIfNoConsumers=true&multipleConsumers=true").to("mock:bar");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+
+    }
+
+    @Test
+    public void testFailIfNoConsumesrAfterConsumersLeave() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("seda:foo?failIfNoConsumers=true").routeId("stopThisRoute").to("mock:foo");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo?failIfNoConsumers=true", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        context.stopRoute("stopThisRoute");
+        TimeUnit.MILLISECONDS.sleep(100);
+        try {
+            template.sendBody("seda:foo?failIfNoConsumers=true", "Hello 
World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(SedaConsumerNotAvailableException.class, 
e.getCause());
+        }
+    }
+
+    @Test
+    public void testFailIfNoConsumersWithValidConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in").to("seda:foo?failIfNoConsumers=true");
+
+                from("seda:foo").to("mock:foo");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:in", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Test
+    public void testConfigOnAConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // these are the same!
+                from("seda:foo?failIfNoConsumers=true").to("log:test");
+                from("seda:foo").to("log:test2");
+
+            }
+        });
+
+        context.start();
+        Thread.sleep(2 * 1000);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
index 4f6077c..b74869e 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java
@@ -20,7 +20,7 @@ import java.util.Iterator;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaComponent;
+import org.apache.camel.component.seda.QueueReference;
 
 /**
  *
@@ -79,7 +79,7 @@ public class VmComponentReferenceEndpointTest extends 
ContextTestSupport {
     
     private int numberOfReferences(VmComponent vm) {
         int num = 0;
-        Iterator<SedaComponent.QueueReference> it = 
vm.getQueues().values().iterator();
+        Iterator<QueueReference> it = vm.getQueues().values().iterator();
         while (it.hasNext()) {
             num += it.next().getCount();
         }

Reply via email to