Author: davsclaus Date: Sun May 20 19:57:17 2012 New Revision: 1340819 URL: http://svn.apache.org/viewvc?rev=1340819&view=rev Log: CAMEL-5261: Fixed seda endpoints shutdown and restart due advice with not picking up refreshed seda queue to use between producers and consumers.
Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java - copied unchanged from r1340818, camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1340818 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1340819&r1=1340818&r2=1340819&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sun May 20 19:57:17 2012 @@ -53,7 +53,7 @@ public class SedaComponent extends Defau return defaultConcurrentConsumers; } - public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { + public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); @@ -65,7 +65,6 @@ public class SedaComponent extends Defau // create queue BlockingQueue<Exchange> queue; - Integer size = getAndRemoveParameter(parameters, "size", Integer.class); if (size != null && size > 0) { queue = new LinkedBlockingQueue<Exchange>(size); } else { @@ -96,7 +95,8 @@ public class SedaComponent extends Defau throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " + maxConcurrentConsumers + " was " + consumers); } - SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers); + Integer size = getAndRemoveParameter(parameters, "size", Integer.class); + SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers); answer.configureProperties(parameters); return answer; } Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1340819&r1=1340818&r2=1340819&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sun May 20 19:57:17 2012 @@ -93,15 +93,27 @@ public class SedaEndpoint extends Defaul public synchronized BlockingQueue<Exchange> getQueue() { if (queue == null) { - if (size > 0) { - queue = new LinkedBlockingQueue<Exchange>(size); + // prefer to lookup queue from component, so if this endpoint is re-created or re-started + // then the existing queue from the component can be used, so new producers and consumers + // can use the already existing queue referenced from the component + if (getComponent() != null) { + queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize()); } else { - queue = new LinkedBlockingQueue<Exchange>(); + // fallback and create queue (as this endpoint has no component) + queue = createQueue(); } } return queue; } + protected BlockingQueue<Exchange> createQueue() { + if (size > 0) { + return new LinkedBlockingQueue<Exchange>(size); + } else { + return new LinkedBlockingQueue<Exchange>(); + } + } + protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { if (!multicastStarted && consumerMulticastProcessor != null) { // only start it on-demand to avoid starting it during stopping @@ -357,6 +369,10 @@ public class SedaEndpoint extends Defaul if (getComponent() != null) { getComponent().onShutdownEndpoint(this); } + + // clear queue, as we are shutdown, so if re-created then the queue must be updated + queue = null; + super.doShutdown(); } }