Author: davsclaus
Date: Sun May 20 19:57:17 2012
New Revision: 1340819
URL: http://svn.apache.org/viewvc?rev=1340819&view=rev
Log:
CAMEL-5261: Fixed seda endpoints shutdown and restart due advice with not
picking up refreshed seda queue to use between producers and consumers.
Added:
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
- copied unchanged from r1340818,
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1340818
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1340819&r1=1340818&r2=1340819&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
Sun May 20 19:57:17 2012
@@ -53,7 +53,7 @@ public class SedaComponent extends Defau
return defaultConcurrentConsumers;
}
- public synchronized BlockingQueue<Exchange> createQueue(String uri,
Map<String, Object> parameters) {
+ public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri,
Integer size) {
String key = getQueueKey(uri);
QueueReference ref = getQueues().get(key);
@@ -65,7 +65,6 @@ public class SedaComponent extends Defau
// create queue
BlockingQueue<Exchange> queue;
- Integer size = getAndRemoveParameter(parameters, "size",
Integer.class);
if (size != null && size > 0) {
queue = new LinkedBlockingQueue<Exchange>(size);
} else {
@@ -96,7 +95,8 @@ public class SedaComponent extends Defau
throw new IllegalArgumentException("The limitConcurrentConsumers
flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
+ maxConcurrentConsumers + " was " + consumers);
}
- SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri,
parameters), consumers);
+ Integer size = getAndRemoveParameter(parameters, "size",
Integer.class);
+ SedaEndpoint answer = new SedaEndpoint(uri, this,
getOrCreateQueue(uri, size), consumers);
answer.configureProperties(parameters);
return answer;
}
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1340819&r1=1340818&r2=1340819&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Sun May 20 19:57:17 2012
@@ -93,15 +93,27 @@ public class SedaEndpoint extends Defaul
public synchronized BlockingQueue<Exchange> getQueue() {
if (queue == null) {
- if (size > 0) {
- queue = new LinkedBlockingQueue<Exchange>(size);
+ // prefer to lookup queue from component, so if this endpoint is
re-created or re-started
+ // then the existing queue from the component can be used, so new
producers and consumers
+ // can use the already existing queue referenced from the component
+ if (getComponent() != null) {
+ queue = getComponent().getOrCreateQueue(getEndpointUri(),
getSize());
} else {
- queue = new LinkedBlockingQueue<Exchange>();
+ // fallback and create queue (as this endpoint has no
component)
+ queue = createQueue();
}
}
return queue;
}
+ protected BlockingQueue<Exchange> createQueue() {
+ if (size > 0) {
+ return new LinkedBlockingQueue<Exchange>(size);
+ } else {
+ return new LinkedBlockingQueue<Exchange>();
+ }
+ }
+
protected synchronized MulticastProcessor getConsumerMulticastProcessor()
throws Exception {
if (!multicastStarted && consumerMulticastProcessor != null) {
// only start it on-demand to avoid starting it during stopping
@@ -357,6 +369,10 @@ public class SedaEndpoint extends Defaul
if (getComponent() != null) {
getComponent().onShutdownEndpoint(this);
}
+
+ // clear queue, as we are shutdown, so if re-created then the queue
must be updated
+ queue = null;
+
super.doShutdown();
}
}