Author: davsclaus
Date: Thu Feb 18 14:01:19 2010
New Revision: 911406

URL: http://svn.apache.org/viewvc?rev=911406&view=rev
Log:
CAMEL-2471: seda endpoint is now not limited to 1000 by default but unbounded.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=911406&r1=911405&r2=911406&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 Thu Feb 18 14:01:19 2010
@@ -43,8 +43,14 @@
         }
 
         // create queue
-        int size = getAndRemoveParameter(parameters, "size", Integer.class, 
1000);
-        BlockingQueue<Exchange> queue = new 
LinkedBlockingQueue<Exchange>(size);
+        BlockingQueue<Exchange> queue;
+        Integer size = getAndRemoveParameter(parameters, "size", 
Integer.class);
+        if (size != null && size > 0) {
+            queue = new LinkedBlockingQueue<Exchange>(size);
+        } else {
+            queue = new LinkedBlockingQueue<Exchange>();
+        }
+
         queues.put(key, queue);
         return queue;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=911406&r1=911405&r2=911406&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 Thu Feb 18 14:01:19 2010
@@ -43,7 +43,7 @@
  */
 public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint, MultipleConsumersSupport {
     private volatile BlockingQueue<Exchange> queue;
-    private int size = 1000;
+    private int size;
     private int concurrentConsumers = 1;
     private boolean multipleConsumers;
     private WaitForTaskToComplete waitForTaskToComplete = 
WaitForTaskToComplete.IfReplyExpected;
@@ -84,7 +84,11 @@
 
     public synchronized BlockingQueue<Exchange> getQueue() {
         if (queue == null) {
-            queue = new LinkedBlockingQueue<Exchange>(size);
+            if (size > 0) {
+                queue = new LinkedBlockingQueue<Exchange>(size);
+            } else {
+                queue = new LinkedBlockingQueue<Exchange>();
+            }
         }
         return queue;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=911406&r1=911405&r2=911406&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 Thu Feb 18 14:01:19 2010
@@ -34,9 +34,9 @@
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
-import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.RouteContext;
 
 /**

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java?rev=911406&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
 Thu Feb 18 14:01:19 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+
+/**
+ * @version $Revision$
+ */
+public class SedaDefaultUnboundedQueueSizeTest extends ContextTestSupport {
+
+    public void testSedaDefaultUnboundedQueueSize() throws Exception {
+        SedaEndpoint seda = context.getEndpoint("seda:foo", 
SedaEndpoint.class);
+        assertEquals(0, seda.getQueue().size());
+
+        for (int i = 0; i < 1200; i++) {
+            template.sendBody("seda:foo", "Message " + i);
+        }
+
+        assertEquals(1200, seda.getQueue().size());
+    }
+
+    public void testSedaDefaultBoundedQueueSize() throws Exception {
+        SedaEndpoint seda = context.getEndpoint("seda:foo?size=500", 
SedaEndpoint.class);
+        assertEquals(0, seda.getQueue().size());
+
+        for (int i = 0; i < 500; i++) {
+            template.sendBody("seda:foo", "Message " + i);
+        }
+
+        assertEquals(500, seda.getQueue().size());
+
+        // sending one more hit the limit
+        try {
+            template.sendBody("seda:foo", "Message overflow");
+            fail("Should thrown an exception");
+        } catch (Exception e) {
+            IllegalStateException ise = 
assertIsInstanceOf(IllegalStateException.class, e.getCause());
+            assertEquals("Queue full", ise.getMessage());
+        }
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to