Author: davsclaus
Date: Thu Apr 29 09:15:08 2010
New Revision: 939238
URL: http://svn.apache.org/viewvc?rev=939238&view=rev
Log:
Using fair lock in aggregator to give timeout checker a chance.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=939238&r1=939237&r2=939238&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Thu Apr 29 09:15:08 2010
@@ -75,7 +75,9 @@ public class AggregateProcessor extends
private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
- private final Lock lock = new ReentrantLock();
+ // use a fair lock so timeout checker will have a chance to acquire the
lock if
+ // a lot of new messages keep arriving
+ private final Lock lock = new ReentrantLock(true);
private final CamelContext camelContext;
private final Processor processor;
private final AggregationStrategy aggregationStrategy;
@@ -175,8 +177,8 @@ public class AggregateProcessor extends
// when memory based then its fast using synchronized, but if the
aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation
key could
// improve performance as we can run aggregation repository get/add in
parallel
+ lock.lock();
try {
- lock.lock();
doAggregation(key, exchange);
} finally {
lock.unlock();
@@ -551,24 +553,23 @@ public class AggregateProcessor extends
log.debug("Completion timeout triggered for correlation key: "
+ key);
}
- try {
- lock.lock();
-
- // double check that its not already in progress
- boolean inProgress =
inProgressCompleteExchanges.contains(exchangeId);
- if (inProgress) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Aggregated exchange with id: " + exchangeId
+ " is already in progress.");
- }
- return;
+ // double check that its not already in progress
+ boolean inProgress =
inProgressCompleteExchanges.contains(exchangeId);
+ if (inProgress) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Aggregated exchange with id: " + exchangeId + "
is already in progress.");
}
+ return;
+ }
- // get the aggregated exchange
- Exchange answer = aggregationRepository.get(camelContext, key);
+ // get the aggregated exchange
+ Exchange answer = aggregationRepository.get(camelContext, key);
- // indicate it was completed by timeout
- answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
"timeout");
+ // indicate it was completed by timeout
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+ lock.lock();
+ try {
onCompletion(key, answer, true);
} finally {
lock.unlock();
@@ -593,23 +594,25 @@ public class AggregateProcessor extends
LOG.trace("Starting completion interval task");
// trigger completion for all in the repository
- try {
- lock.lock();
+ Set<String> keys = aggregationRepository.getKeys();
- Set<String> keys = aggregationRepository.getKeys();
- for (String key : keys) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Completion interval triggered for
correlation key: " + key);
- }
- Exchange exchange =
aggregationRepository.get(camelContext, key);
+ if (keys != null && !keys.isEmpty()) {
+ lock.lock();
+ try {
+ for (String key : keys) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completion interval triggered for
correlation key: " + key);
+ }
+ Exchange exchange =
aggregationRepository.get(camelContext, key);
- // indicate it was completed by interval
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
"interval");
+ // indicate it was completed by interval
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
"interval");
- onCompletion(key, exchange, false);
+ onCompletion(key, exchange, false);
+ }
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
}
LOG.trace("Completion interval task complete");
@@ -703,8 +706,8 @@ public class AggregateProcessor extends
LOG.debug("Delivery attempt: " +
data.redeliveryCounter + " to recover aggregated exchange with id: " +
exchangeId + "");
}
// not exhaust so resubmit the recovered exchange
+ lock.lock();
try {
- lock.lock();
onSubmitCompletion(key, exchange);
} finally {
lock.unlock();