Author: davsclaus
Date: Sun May 20 19:57:17 2012
New Revision: 1340819

URL: http://svn.apache.org/viewvc?rev=1340819&view=rev
Log:
CAMEL-5261: Fixed seda endpoints shutdown and restart due advice with not 
picking up refreshed seda queue to use between producers and consumers.

Added:
    
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
      - copied unchanged from r1340818, 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1340818

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1340819&r1=1340818&r2=1340819&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
 Sun May 20 19:57:17 2012
@@ -53,7 +53,7 @@ public class SedaComponent extends Defau
         return defaultConcurrentConsumers;
     }
 
-    public synchronized BlockingQueue<Exchange> createQueue(String uri, 
Map<String, Object> parameters) {
+    public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, 
Integer size) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -65,7 +65,6 @@ public class SedaComponent extends Defau
 
         // create queue
         BlockingQueue<Exchange> queue;
-        Integer size = getAndRemoveParameter(parameters, "size", 
Integer.class);
         if (size != null && size > 0) {
             queue = new LinkedBlockingQueue<Exchange>(size);
         } else {
@@ -96,7 +95,8 @@ public class SedaComponent extends Defau
             throw new IllegalArgumentException("The limitConcurrentConsumers 
flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-        SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri, 
parameters), consumers);
+        Integer size = getAndRemoveParameter(parameters, "size", 
Integer.class);
+        SedaEndpoint answer = new SedaEndpoint(uri, this, 
getOrCreateQueue(uri, size), consumers);
         answer.configureProperties(parameters);
         return answer;
     }

Modified: 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1340819&r1=1340818&r2=1340819&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 Sun May 20 19:57:17 2012
@@ -93,15 +93,27 @@ public class SedaEndpoint extends Defaul
 
     public synchronized BlockingQueue<Exchange> getQueue() {
         if (queue == null) {
-            if (size > 0) {
-                queue = new LinkedBlockingQueue<Exchange>(size);
+            // prefer to lookup queue from component, so if this endpoint is 
re-created or re-started
+            // then the existing queue from the component can be used, so new 
producers and consumers
+            // can use the already existing queue referenced from the component
+            if (getComponent() != null) {
+                queue = getComponent().getOrCreateQueue(getEndpointUri(), 
getSize());
             } else {
-                queue = new LinkedBlockingQueue<Exchange>();
+                // fallback and create queue (as this endpoint has no 
component)
+                queue = createQueue();
             }
         }
         return queue;
     }
 
+    protected BlockingQueue<Exchange> createQueue() {
+        if (size > 0) {
+            return new LinkedBlockingQueue<Exchange>(size);
+        } else {
+            return new LinkedBlockingQueue<Exchange>();
+        }
+    }
+
     protected synchronized MulticastProcessor getConsumerMulticastProcessor() 
throws Exception {
         if (!multicastStarted && consumerMulticastProcessor != null) {
             // only start it on-demand to avoid starting it during stopping
@@ -357,6 +369,10 @@ public class SedaEndpoint extends Defaul
         if (getComponent() != null) {
             getComponent().onShutdownEndpoint(this);
         }
+
+        // clear queue, as we are shutdown, so if re-created then the queue 
must be updated
+        queue = null;
+
         super.doShutdown();
     }
 }


Reply via email to