Author: davsclaus Date: Sun Nov 25 17:29:08 2012 New Revision: 1413377 URL: http://svn.apache.org/viewvc?rev=1413377&view=rev Log: CAMEL-5793: Validate seda/vm endpoints when using size options that there is no miss-match. Otherwise users may use seda queues with a size they would not expect. Add INFO logging to show users what queue and sizes are in use.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sun Nov 25 17:29:08 2012 @@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlocki import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a> @@ -32,6 +34,7 @@ import org.apache.camel.impl.DefaultComp * @version */ public class SedaComponent extends DefaultComponent { + protected final transient Logger log = LoggerFactory.getLogger(getClass()); protected final int maxConcurrentConsumers = 500; protected int queueSize; protected int defaultConcurrentConsumers = 1; @@ -53,14 +56,25 @@ public class SedaComponent extends Defau return defaultConcurrentConsumers; } - public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) { + public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); if (ref != null) { + + // if the given size is not provided, we just use the existing queue as is + if (size != null && ref.getSize() != size) { + // there is already a queue, so make sure the size matches + throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size " + + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size); + } // add the reference before returning queue ref.addReference(); - return ref.getQueue(); + + if (log.isDebugEnabled()) { + log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()}); + } + return ref; } // create queue @@ -69,18 +83,20 @@ public class SedaComponent extends Defau queue = new LinkedBlockingQueue<Exchange>(size); } else { if (getQueueSize() > 0) { + size = getQueueSize(); queue = new LinkedBlockingQueue<Exchange>(getQueueSize()); } else { queue = new LinkedBlockingQueue<Exchange>(); } } + log.debug("Created queue {} with size {}", key, size); // create and add a new reference queue - ref = new QueueReference(queue); + ref = new QueueReference(queue, size); ref.addReference(); getQueues().put(key, ref); - return queue; + return ref; } public Map<String, QueueReference> getQueues() { @@ -95,8 +111,8 @@ public class SedaComponent extends Defau throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " + maxConcurrentConsumers + " was " + consumers); } - Integer size = getAndRemoveParameter(parameters, "size", Integer.class); - SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers); + // defer creating queue till endpoint is started, so we pass in null + SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers); answer.configureProperties(parameters); return answer; } @@ -143,9 +159,11 @@ public class SedaComponent extends Defau private final BlockingQueue<Exchange> queue; private volatile int count; + private Integer size; - private QueueReference(BlockingQueue<Exchange> queue) { + private QueueReference(BlockingQueue<Exchange> queue, Integer size) { this.queue = queue; + this.size = size; } void addReference() { @@ -164,6 +182,15 @@ public class SedaComponent extends Defau } /** + * Gets the queue size + * + * @return <tt>null</tt> if unbounded + */ + public Integer getSize() { + return size; + } + + /** * Gets the queue */ public BlockingQueue<Exchange> getQueue() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sun Nov 25 17:29:08 2012 @@ -43,6 +43,8 @@ import org.apache.camel.util.EndpointHel import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.URISupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of the <a @@ -51,8 +53,9 @@ import org.apache.camel.util.URISupport; */ @ManagedResource(description = "Managed SedaEndpoint") public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class); private volatile BlockingQueue<Exchange> queue; - private int size; + private int size = Integer.MAX_VALUE; private int concurrentConsumers = 1; private volatile ExecutorService multicastExecutor; private boolean multipleConsumers; @@ -75,7 +78,9 @@ public class SedaEndpoint extends Defaul public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { super(endpointUri, component); this.queue = queue; - this.size = queue.remainingCapacity(); + if (queue != null) { + this.size = queue.remainingCapacity(); + } this.concurrentConsumers = concurrentConsumers; } @@ -98,10 +103,20 @@ public class SedaEndpoint extends Defaul // then the existing queue from the component can be used, so new producers and consumers // can use the already existing queue referenced from the component if (getComponent() != null) { - queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize()); + // use null to indicate default size (= use what the existing queue has been configured with) + Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); + SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size); + queue = ref.getQueue(); + String key = getComponent().getQueueKey(getEndpointUri()); + LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); + // and set the size we are using + if (ref.getSize() != null) { + setSize(ref.getSize()); + } } else { // fallback and create queue (as this endpoint has no component) queue = createQueue(); + LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()}); } } return queue; @@ -358,6 +373,11 @@ public class SedaEndpoint extends Defaul protected void doStart() throws Exception { super.doStart(); + // force creating queue when starting + if (queue == null) { + queue = getQueue(); + } + // special for unit testing where we can set a system property to make seda poll faster // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout()); Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java?rev=1413377&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class SameSedaQueueSizeAndNoSizeTest extends ContextTestSupport { + + public void testSameQueue() throws Exception { + for (int i = 0; i < 100; i++) { + template.sendBody("seda:foo", "" + i); + } + + try { + template.sendBody("seda:foo", "Should be full now"); + fail("Should fail"); + } catch (CamelExecutionException e) { + IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause()); + assertEquals("Queue full", ise.getMessage()); + } + } + + public void testSameQueueDifferentSize() throws Exception { + try { + template.sendBody("seda:foo?size=200", "Should fail"); + fail("Should fail"); + } catch (ResolveEndpointFailedException e) { + IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Cannot use existing queue seda://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage()); + } + } + + public void testSameQueueDifferentSizeBar() throws Exception { + try { + template.sendBody("seda:bar?size=200", "Should fail"); + fail("Should fail"); + } catch (ResolveEndpointFailedException e) { + IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Cannot use existing queue seda://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?size=100").routeId("foo").noAutoStartup() + .to("mock:foo"); + + from("seda:bar").routeId("bar").noAutoStartup() + .to("mock:bar"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java Sun Nov 25 17:29:08 2012 @@ -40,7 +40,7 @@ public class SedaQueueTest extends Conte return new RouteBuilder() { @Override public void configure() throws Exception { - from("seda:foo?concurrentConsumers=2").to("mock:result"); + from("seda:foo?size=20&concurrentConsumers=2").to("mock:result"); from("seda:bar").to("mock:result"); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java?rev=1413377&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vm; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class SameVmQueueSizeAndNoSizeTest extends ContextTestSupport { + + public void testSameQueue() throws Exception { + for (int i = 0; i < 100; i++) { + template.sendBody("vm:foo", "" + i); + } + + try { + template.sendBody("vm:foo", "Should be full now"); + fail("Should fail"); + } catch (CamelExecutionException e) { + IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause()); + assertEquals("Queue full", ise.getMessage()); + } + } + + public void testSameQueueDifferentSize() throws Exception { + try { + template.sendBody("vm:foo?size=200", "Should fail"); + fail("Should fail"); + } catch (ResolveEndpointFailedException e) { + IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Cannot use existing queue vm://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage()); + } + } + + public void testSameQueueDifferentSizeBar() throws Exception { + try { + template.sendBody("vm:bar?size=200", "Should fail"); + fail("Should fail"); + } catch (ResolveEndpointFailedException e) { + IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Cannot use existing queue vm://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("vm:foo?size=100").routeId("foo").noAutoStartup() + .to("mock:foo"); + + from("vm:bar").routeId("bar").noAutoStartup() + .to("mock:bar"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java Sun Nov 25 17:29:08 2012 @@ -49,7 +49,7 @@ public class VmQueueTest extends Abstrac return new RouteBuilder() { @Override public void configure() throws Exception { - from("vm:foo?concurrentConsumers=2").to("mock:result"); + from("vm:foo?size=20&concurrentConsumers=2").to("mock:result"); } }; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java Sun Nov 25 17:29:08 2012 @@ -37,7 +37,7 @@ public class VmUseSameQueueTest extends return new RouteBuilder() { @Override public void configure() throws Exception { - from("vm:foo").to("mock:result"); + from("vm:foo?size=500").to("mock:result"); } }; } @@ -47,7 +47,7 @@ public class VmUseSameQueueTest extends return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("vm:foo?size=500"); + from("direct:start").to("vm:foo"); } }; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java Sun Nov 25 17:29:08 2012 @@ -37,7 +37,7 @@ public class AdviceWithMockEndpointsHavi public void testAdvisedMockEndpoints() throws Exception { // advice the first route using the inlined AdviceWith route builder // which has extended capabilities than the regular route builder - context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() { + context.getRouteDefinitions().get(1).adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // mock all endpoints (will mock in all routes) @@ -74,16 +74,16 @@ public class AdviceWithMockEndpointsHavi return new RouteBuilder() { @Override public void configure() throws Exception { + from("seda:foo?size=20") + .transform(constant("Bye World")) + .log("We transformed ${body}") + .to("log:foo?showHeaders=false") + .to("mock:foo"); + from("direct:start") .to("seda:foo") .to("log:start?showAll=true") .to("mock:result"); - - from("seda:foo?size=20") - .transform(constant("Bye World")) - .log("We transformed ${body}") - .to("log:foo?showHeaders=false") - .to("mock:foo"); } }; }