Author: davsclaus Date: Thu Feb 18 14:01:19 2010 New Revision: 911406 URL: http://svn.apache.org/viewvc?rev=911406&view=rev Log: CAMEL-2471: seda endpoint is now not limited to 1000 by default but unbounded.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=911406&r1=911405&r2=911406&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Thu Feb 18 14:01:19 2010 @@ -43,8 +43,14 @@ } // create queue - int size = getAndRemoveParameter(parameters, "size", Integer.class, 1000); - BlockingQueue<Exchange> queue = new LinkedBlockingQueue<Exchange>(size); + BlockingQueue<Exchange> queue; + Integer size = getAndRemoveParameter(parameters, "size", Integer.class); + if (size != null && size > 0) { + queue = new LinkedBlockingQueue<Exchange>(size); + } else { + queue = new LinkedBlockingQueue<Exchange>(); + } + queues.put(key, queue); return queue; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=911406&r1=911405&r2=911406&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Feb 18 14:01:19 2010 @@ -43,7 +43,7 @@ */ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { private volatile BlockingQueue<Exchange> queue; - private int size = 1000; + private int size; private int concurrentConsumers = 1; private boolean multipleConsumers; private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; @@ -84,7 +84,11 @@ public synchronized BlockingQueue<Exchange> getQueue() { if (queue == null) { - queue = new LinkedBlockingQueue<Exchange>(size); + if (size > 0) { + queue = new LinkedBlockingQueue<Exchange>(size); + } else { + queue = new LinkedBlockingQueue<Exchange>(); + } } return queue; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=911406&r1=911405&r2=911406&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Thu Feb 18 14:01:19 2010 @@ -34,9 +34,9 @@ import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; -import org.apache.camel.spi.AggregationRepository; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; +import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.RouteContext; /** Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java?rev=911406&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java Thu Feb 18 14:01:19 2010 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; + +/** + * @version $Revision$ + */ +public class SedaDefaultUnboundedQueueSizeTest extends ContextTestSupport { + + public void testSedaDefaultUnboundedQueueSize() throws Exception { + SedaEndpoint seda = context.getEndpoint("seda:foo", SedaEndpoint.class); + assertEquals(0, seda.getQueue().size()); + + for (int i = 0; i < 1200; i++) { + template.sendBody("seda:foo", "Message " + i); + } + + assertEquals(1200, seda.getQueue().size()); + } + + public void testSedaDefaultBoundedQueueSize() throws Exception { + SedaEndpoint seda = context.getEndpoint("seda:foo?size=500", SedaEndpoint.class); + assertEquals(0, seda.getQueue().size()); + + for (int i = 0; i < 500; i++) { + template.sendBody("seda:foo", "Message " + i); + } + + assertEquals(500, seda.getQueue().size()); + + // sending one more hit the limit + try { + template.sendBody("seda:foo", "Message overflow"); + fail("Should thrown an exception"); + } catch (Exception e) { + IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause()); + assertEquals("Queue full", ise.getMessage()); + } + } + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date