CAMEL-6588 Choose BlockingQueue implementation in Seda component with thanks to 
Gérald


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/276aa87e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/276aa87e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/276aa87e

Branch: refs/heads/master
Commit: 276aa87ec38bfd884a3dc52d8685d6be3922f553
Parents: f58ccf9
Author: Willem Jiang <ningji...@apache.org>
Authored: Fri Aug 2 10:41:13 2013 +0800
Committer: Willem Jiang <ningji...@apache.org>
Committed: Fri Aug 2 11:33:33 2013 +0800

----------------------------------------------------------------------
 .../seda/ArrayBlockingQueueFactory.java         | 73 ++++++++++++++++++++
 .../component/seda/BlockingQueueFactory.java    | 38 ++++++++++
 .../seda/LinkedBlockingQueueFactory.java        | 35 ++++++++++
 .../seda/PriorityBlockingQueueFactory.java      | 54 +++++++++++++++
 .../camel/component/seda/SedaComponent.java     | 40 ++++++++---
 .../camel/component/seda/SedaEndpoint.java      | 21 ++++--
 .../component/seda/SedaQueueFactoryTest.java    | 66 ++++++++++++++++++
 .../camel/component/seda/SedaQueueTest.java     | 24 +++++++
 8 files changed, 338 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
 
b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
new file mode 100644
index 0000000..3983ff8
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link 
java.util.concurrent.ArrayBlockingQueue}
+ */
+public class ArrayBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
+       /**
+        * Capacity used when none provided
+        */
+       private int defaultCapacity=50;
+       /**
+        * Lock fairness. null means default fairness
+        */
+       private Boolean fair;
+       /**
+        * @return Default array capacity
+        */
+       public int getDefaultCapacity() {
+               return defaultCapacity;
+       }
+
+       /**
+        * @param defaultCapacity Default array capacity
+        */
+       public void setDefaultCapacity(int defaultCapacity) {
+               this.defaultCapacity = defaultCapacity;
+       }
+
+       /**
+        * @return Lock fairness
+        */
+       public boolean isFair() {
+               return fair;
+       }
+
+       /**
+        * @param fair Lock fairness
+        */
+       public void setFair(boolean fair) {
+               this.fair = fair;
+       }
+       
+    @Override
+    public ArrayBlockingQueue<E> create() {
+        return create(defaultCapacity);
+    }
+
+    @Override
+    public ArrayBlockingQueue<E> create(int capacity) {
+        return fair == null ? 
+                               new ArrayBlockingQueue<E>(defaultCapacity) :
+                               new ArrayBlockingQueue<E>(defaultCapacity, 
fair) ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
 
b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
new file mode 100644
index 0000000..0d69433
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.BlockingQueue;
+import org.apache.camel.Exchange;
+
+/**
+ * Factory of {@link java.util.concurrent.BlockingQueue}
+ * @param <E> Element type, usually {@link Exchange}
+ */
+public interface BlockingQueueFactory<E> {
+    /**
+     * Create a new {@link java.util.concurrent.BlockingQueue} with default 
capacity
+     * @return New {@link java.util.concurrent.BlockingQueue}
+     */
+    BlockingQueue<E> create();
+    /**
+     * Create a new {@link java.util.concurrent.BlockingQueue} with given 
capacity
+     * @return New {@link java.util.concurrent.BlockingQueue}
+     */
+    BlockingQueue<E> create(int capacity);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
 
b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
new file mode 100644
index 0000000..096cd5b
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link 
java.util.concurrent.LinkedBlockingQueue}
+ */
+public class LinkedBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
+    @Override
+    public LinkedBlockingQueue<E> create() {
+        return new LinkedBlockingQueue<E>();
+    }
+
+    @Override
+    public LinkedBlockingQueue<E> create(int capacity) {
+        return new LinkedBlockingQueue<E>(capacity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
 
b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
new file mode 100644
index 0000000..da90d16
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.Comparator;
+
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link 
java.util.concurrent.PriorityBlockingQueue}
+ */
+public class PriorityBlockingQueueFactory<E> implements 
BlockingQueueFactory<E> {
+       /**
+        * Comparator used to sort exchanges
+        */
+       private Comparator<E> comparator;
+
+       public Comparator<E> getComparator() {
+               return comparator;
+       }
+
+       public void setComparator(Comparator<E> comparator) {
+               this.comparator = comparator;
+       }
+       
+    @Override
+    public PriorityBlockingQueue<E> create() {
+        return comparator==null ?
+                               new PriorityBlockingQueue<E>() :
+                               // PriorityQueue as a default capacity of 11
+                               new PriorityBlockingQueue<E>(11, comparator);
+    }
+
+    @Override
+    public PriorityBlockingQueue<E> create(int capacity) {
+        return comparator==null?
+                               new PriorityBlockingQueue<E>(capacity):
+                               new PriorityBlockingQueue<E>(capacity, 
comparator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index e6d5171..b13dd64 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.seda;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -39,7 +38,7 @@ public class SedaComponent extends UriEndpointComponent {
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
     private final Map<String, QueueReference> queues = new HashMap<String, 
QueueReference>();
-
+    private BlockingQueueFactory<Exchange> defaultQueueFactory =new 
LinkedBlockingQueueFactory<Exchange>();
     public SedaComponent() {
         super(SedaEndpoint.class);
     }
@@ -60,15 +59,30 @@ public class SedaComponent extends UriEndpointComponent {
         return defaultConcurrentConsumers;
     }
 
+       public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
+               return defaultQueueFactory;
+       }
+
+       public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> 
defaultQueueFactory) {
+               this.defaultQueueFactory = defaultQueueFactory;
+       }
+
     /**
-     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)}
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, 
BlockingQueueFactory)}
      */
     @Deprecated
     public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size) {
         return getOrCreateQueue(uri, size, null);
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, 
BlockingQueueFactory)}
+     */
     public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size, Boolean multipleConsumers) {
+        return getOrCreateQueue(uri, size, multipleConsumers, null);
+    }
+
+    public synchronized QueueReference getOrCreateQueue(String uri, Integer 
size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -91,14 +105,15 @@ public class SedaComponent extends UriEndpointComponent {
 
         // create queue
         BlockingQueue<Exchange> queue;
+        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == 
null ? defaultQueueFactory : customQueueFactory;
         if (size != null && size > 0) {
-            queue = new LinkedBlockingQueue<Exchange>(size);
+            queue = queueFactory.create(size);
         } else {
             if (getQueueSize() > 0) {
                 size = getQueueSize();
-                queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
+                queue = queueFactory.create(getQueueSize());
             } else {
-                queue = new LinkedBlockingQueue<Exchange>();
+                queue = queueFactory.create();
             }
         }
         log.debug("Created queue {} with size {}", key, size);
@@ -127,8 +142,17 @@ public class SedaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("The limitConcurrentConsumers 
flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-        // defer creating queue till endpoint is started, so we pass in null
-        SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers);
+               // Resolve queue reference
+               BlockingQueue<Exchange> 
queue=resolveAndRemoveReferenceParameter(parameters, "queue", 
BlockingQueue.class);
+               SedaEndpoint answer;
+               // Resolve queue factory when no queue specified
+               if (queue == null) {
+                       BlockingQueueFactory<Exchange> 
queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", 
BlockingQueueFactory.class);
+                       // defer creating queue till endpoint is started, so we 
pass the queue factory
+                       answer = new SedaEndpoint(uri, this, queueFactory, 
consumers);                  
+               } else {
+                       answer = new SedaEndpoint(uri, this, queue, consumers);
+               }
         answer.configureProperties(parameters);
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 656736c..d806d60 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -79,8 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
     private int pollTimeout = 1000;
     @UriParam
     private boolean purgeWhenStopping;
+    private BlockingQueueFactory<Exchange> queueFactory;
 
     public SedaEndpoint() {
+               queueFactory = new LinkedBlockingQueueFactory<Exchange>();
     }
 
     public SedaEndpoint(String endpointUri, Component component, 
BlockingQueue<Exchange> queue) {
@@ -88,11 +89,21 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
     }
 
     public SedaEndpoint(String endpointUri, Component component, 
BlockingQueue<Exchange> queue, int concurrentConsumers) {
-        super(endpointUri, component);
+               this(endpointUri, component, concurrentConsumers);
         this.queue = queue;
         if (queue != null) {
             this.size = queue.remainingCapacity();
         }
+               queueFactory = new LinkedBlockingQueueFactory<Exchange>();
+       }
+       
+    public SedaEndpoint(String endpointUri, Component component, 
BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
+               this(endpointUri, component, concurrentConsumers);
+               this.queueFactory = queueFactory;
+       }
+       
+    private SedaEndpoint(String endpointUri, Component component, int 
concurrentConsumers) {
+        super(endpointUri, component);
         this.concurrentConsumers = concurrentConsumers;
     }
 
@@ -130,7 +141,7 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
             if (getComponent() != null) {
                 // use null to indicate default size (= use what the existing 
queue has been configured with)
                 Integer size = getSize() == Integer.MAX_VALUE ? null : 
getSize();
-                SedaComponent.QueueReference ref = 
getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers());
+                SedaComponent.QueueReference ref = 
getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), 
queueFactory);
                 queue = ref.getQueue();
                 String key = getComponent().getQueueKey(getEndpointUri());
                 LOG.info("Endpoint {} is using shared queue: {} with size: 
{}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : 
Integer.MAX_VALUE});
@@ -149,9 +160,9 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
 
     protected BlockingQueue<Exchange> createQueue() {
         if (size > 0) {
-            return new LinkedBlockingQueue<Exchange>(size);
+            return queueFactory.create(size);
         } else {
-            return new LinkedBlockingQueue<Exchange>();
+            return queueFactory.create();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
new file mode 100644
index 0000000..033ce80
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import static junit.framework.TestCase.assertEquals;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import static org.apache.camel.TestSupport.assertIsInstanceOf;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+/**
+ *
+ */
+public class SedaQueueFactoryTest extends ContextTestSupport {
+       private final ArrayBlockingQueueFactory<Exchange> arrayQueueFactory=new 
ArrayBlockingQueueFactory<Exchange>();
+
+       @Override
+       protected CamelContext createCamelContext() throws Exception {
+               SimpleRegistry simpleRegistry=new SimpleRegistry();
+               simpleRegistry.put("arrayQueueFactory", arrayQueueFactory);
+        return new DefaultCamelContext(simpleRegistry);
+       }
+       
+    @SuppressWarnings("unchecked")
+    public void testArrayBlockingQueueFactory() throws Exception {
+        SedaEndpoint endpoint = 
resolveMandatoryEndpoint("seda:arrayQueue?queueFactory=#arrayQueueFactory", 
SedaEndpoint.class);
+               
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        ArrayBlockingQueue<Exchange> blockingQueue = 
assertIsInstanceOf(ArrayBlockingQueue.class, queue);
+    }  
+
+       @SuppressWarnings("unchecked")
+    public void testArrayBlockingQueueFactoryAndSize() throws Exception {
+        SedaEndpoint endpoint = 
resolveMandatoryEndpoint("seda:arrayQueue50?queueFactory=#arrayQueueFactory&size=50",
 SedaEndpoint.class);
+               
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        ArrayBlockingQueue<Exchange> blockingQueue = 
assertIsInstanceOf(ArrayBlockingQueue.class, queue);
+               assertEquals("remainingCapacity", 50, 
blockingQueue.remainingCapacity());
+    }  
+
+       @SuppressWarnings("unchecked")
+    public void testDefaultBlockingQueueFactory() throws Exception {
+        SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:linkedQueue", 
SedaEndpoint.class);
+               
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        LinkedBlockingQueue<Exchange> blockingQueue = 
assertIsInstanceOf(LinkedBlockingQueue.class, queue);
+    }  
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
index bf30cdc..54ccdd5 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.camel.component.seda;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.hamcrest.CoreMatchers;
+import org.junit.matchers.JUnitMatchers;
 
 /**
  * @version 
@@ -34,6 +41,21 @@ public class SedaQueueTest extends ContextTestSupport {
         template.sendBody("seda:foo?concurrentConsumers=5", "Goodday World");
         template.sendBody("seda:bar", "Bar");
     }
+    public void testQueueRef() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:array?queue=#arrayQueue", "Hello World");
+               
+               SedaEndpoint 
sedaEndpoint=resolveMandatoryEndpoint("seda:array?queue=#arrayQueue", 
SedaEndpoint.class);
+               assertTrue(sedaEndpoint.getQueue() instanceof 
ArrayBlockingQueue);
+    }
+       @Override
+       protected CamelContext createCamelContext() throws Exception {
+               SimpleRegistry simpleRegistry=new SimpleRegistry();
+               simpleRegistry.put("arrayQueue", new 
ArrayBlockingQueue<Exchange>(10));
+        return new DefaultCamelContext(simpleRegistry);
+       }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -43,6 +65,8 @@ public class SedaQueueTest extends ContextTestSupport {
                 
from("seda:foo?size=20&concurrentConsumers=2").to("mock:result");
 
                 from("seda:bar").to("mock:result");
+
+                               
from("seda:array?queue=#arrayQueue").to("mock:result");
             }
         };
     }

Reply via email to