Author: davsclaus Date: Thu Apr 8 07:22:39 2010 New Revision: 931804 URL: http://svn.apache.org/viewvc?rev=931804&view=rev Log: CAMEL-2568: Reworked lock on Aggregator a bit.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java (contents, props changed) - copied, changed from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java (contents, props changed) - copied, changed from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Removed: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/SpringAntPathMatcherFileFilter.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr 8 07:22:39 2010 @@ -27,6 +27,8 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; @@ -37,7 +39,6 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; -import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.processor.Traceable; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; @@ -73,6 +74,7 @@ public class AggregateProcessor extends private static final Log LOG = LogFactory.getLog(AggregateProcessor.class); + private final Lock lock = new ReentrantLock(); private final CamelContext camelContext; private final Processor processor; private final AggregationStrategy aggregationStrategy; @@ -164,7 +166,15 @@ public class AggregateProcessor extends throw new ClosedCorrelationKeyException(key, exchange); } - doAggregation(key, exchange); + // when memory based then its fast using synchronized, but if the aggregation repository is IO + // bound such as JPA etc then concurrent aggregation per correlation key could + // improve performance as we can run aggregation repository get/add in parallel + try { + lock.lock(); + doAggregation(key, exchange); + } finally { + lock.unlock(); + } } /** @@ -177,11 +187,7 @@ public class AggregateProcessor extends * @param exchange the exchange * @return the aggregated exchange */ - private synchronized Exchange doAggregation(Object key, Exchange exchange) { - // when memory based then its fast using synchronized, but if the aggregation repository is IO - // bound such as JPA etc then concurrent aggregation per correlation key could - // improve performance as we can run aggregation repository get/add in parallel - + private Exchange doAggregation(Object key, Exchange exchange) { if (LOG.isTraceEnabled()) { LOG.trace("onAggregation +++ start +++ with correlation key: " + key); } @@ -209,12 +215,11 @@ public class AggregateProcessor extends // prepare the exchanges for aggregation and aggregate it ExchangeHelper.prepareAggregation(oldExchange, newExchange); answer = onAggregation(oldExchange, exchange); + // update the aggregated size answer.setProperty(Exchange.AGGREGATED_SIZE, size); // maybe we should check completion after the aggregation if (!isEagerCheckCompletion()) { - // put the current aggregated size on the exchange so its avail during completion check - answer.setProperty(Exchange.AGGREGATED_SIZE, size); complete = isCompleted(key, answer); } @@ -506,7 +511,13 @@ public class AggregateProcessor extends } exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); - onCompletion(key, exchange, true); + + try { + lock.lock(); + onCompletion(key, exchange, true); + } finally { + lock.unlock(); + } } } @@ -569,7 +580,12 @@ public class AggregateProcessor extends exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); // resubmit the recovered exchange - onSubmitCompletion(key, exchange); + try { + lock.lock(); + onSubmitCompletion(key, exchange); + } finally { + lock.unlock(); + } } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Thu Apr 8 07:22:39 2010 @@ -75,7 +75,6 @@ public class AggregateCompletionPredicat template.sendBodyAndHeader("direct:start", "C", "id", "foo"); template.sendBodyAndHeader("direct:start", "3", "id", "bar"); template.sendBodyAndHeader("direct:start", "END", "id", "foo"); - template.sendBodyAndHeader("direct:start", "4", "id", "bar"); template.sendBodyAndHeader("direct:start", "END", "id", "bar"); Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java (from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java&r1=931784&r2=931804&rev=931804&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregateShouldSkipFilteredExchanges.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java Thu Apr 8 07:22:39 2010 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.processor; +package org.apache.camel.processor.aggregator; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; @@ -28,7 +28,7 @@ import org.apache.camel.processor.aggreg * * @version $Revision$ */ -public class AggregateShouldSkipFilteredExchanges extends ContextTestSupport { +public class AggregateShouldSkipFilteredExchangesTest extends ContextTestSupport { public void testAggregateWithFilter() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); @@ -54,10 +54,12 @@ public class AggregateShouldSkipFiltered from("direct:start") .filter(goodWord) - .to("mock:filtered") - .aggregate(header("id"), new MyAggregationStrategy()) - .to("mock:result") + .to("mock:filtered") + .aggregate(header("id"), new MyAggregationStrategy()).completionTimeout(1000) + .to("mock:result") + .end() .end(); + } }; } @@ -65,12 +67,11 @@ public class AggregateShouldSkipFiltered private class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - String newBody = newExchange.getIn().getBody(String.class); - if (oldExchange == null) { return newExchange; } + String newBody = newExchange.getIn().getBody(String.class); String body = oldExchange.getIn().getBody(String.class); body = body + "," + newBody; oldExchange.getIn().setBody(body); Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShouldSkipFilteredExchangesTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java (from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java&r1=931784&r2=931804&rev=931804&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java Thu Apr 8 07:22:39 2010 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.processor; +package org.apache.camel.processor.aggregator; import java.util.HashMap; import java.util.Map; @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.MyAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; /** @@ -108,8 +109,6 @@ public class AggregatorTest extends Cont for (int i = 1; i <= messageCount; i++) { String body = "message:" + i; template.sendBodyAndHeader(endpointUri, body, "cheese", 123); - // need a little sleep when sending large batches - Thread.sleep(3); } resultEndpoint.assertIsSatisfied(); Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java Thu Apr 8 07:22:39 2010 @@ -77,9 +77,8 @@ public class HawtDBAggregateConcurrentDi final int index = i; executor.submit(new Callable<Object>() { public Object call() throws Exception { - int idx = index % 10; String id = index % 2 == 0 ? "A" : "B"; - template.sendBodyAndHeader("direct:start", idx, "id", id); + template.sendBodyAndHeader("direct:start", index, "id", id); return null; } }); @@ -98,7 +97,7 @@ public class HawtDBAggregateConcurrentDi from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) - .completionTimeout(3000).aggregationRepository(repo) + .completionTimeout(1000).aggregationRepository(repo) .to("mock:aggregated"); } }; Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java Thu Apr 8 07:22:39 2010 @@ -71,16 +71,13 @@ public class HawtDBAggregateConcurrentSa MockEndpoint mock = getMockEndpoint("mock:aggregated"); mock.setResultWaitTime(30 * 1000L); mock.expectedMessageCount(1); - // match number of expected numbers - mock.message(0).body(String.class).regex("[0-9]{" + files + "}"); ExecutorService executor = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < files; i++) { final int index = i; executor.submit(new Callable<Object>() { public Object call() throws Exception { - int idx = index % 10; - template.sendBodyAndHeader("direct:start", idx, "id", 123); + template.sendBodyAndHeader("direct:start", index, "id", 123); return null; } }); @@ -99,7 +96,7 @@ public class HawtDBAggregateConcurrentSa from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) - .completionTimeout(3000).aggregationRepository(repo) + .completionTimeout(1000).aggregationRepository(repo) .to("mock:aggregated"); } }; Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java Thu Apr 8 07:22:39 2010 @@ -26,11 +26,14 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.junit.Before; import org.junit.Test; public class HawtDBAggregateLoadAndRecoverTest extends CamelTestSupport { + private static final Log LOG = LogFactory.getLog(HawtDBAggregateLoadAndRecoverTest.class); private static final int SIZE = 1000; private static AtomicInteger counter = new AtomicInteger(); @@ -47,7 +50,7 @@ public class HawtDBAggregateLoadAndRecov mock.expectedMessageCount(SIZE / 10); mock.setResultWaitTime(30 * 1000); - System.out.println("Staring to send " + SIZE + " messages."); + LOG.info("Staring to send " + SIZE + " messages."); for (int i = 0; i < SIZE; i++) { final int value = 1; @@ -55,10 +58,13 @@ public class HawtDBAggregateLoadAndRecov Map headers = new HashMap(); headers.put("id", id); headers.put("seq", i); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending " + value + " with id " + id); + } template.sendBodyAndHeaders("seda:start?size=" + SIZE, value, headers); } - System.out.println("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); + LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); assertMockEndpointsSatisfied(); Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java Thu Apr 8 07:22:39 2010 @@ -25,11 +25,14 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.junit.Before; import org.junit.Test; public class HawtDBAggregateLoadConcurrentTest extends CamelTestSupport { + private static final Log LOG = LogFactory.getLog(HawtDBAggregateLoadConcurrentTest.class); private static final char[] KEYS = new char[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'}; private static final int SIZE = 5000; @@ -48,7 +51,7 @@ public class HawtDBAggregateLoadConcurre ExecutorService executor = Executors.newFixedThreadPool(10); - System.out.println("Staring to send " + SIZE + " messages."); + LOG.info("Staring to send " + SIZE + " messages."); for (int i = 0; i < SIZE; i++) { final int value = 1; @@ -56,13 +59,16 @@ public class HawtDBAggregateLoadConcurre executor.submit(new Callable<Object>() { public Object call() throws Exception { char id = KEYS[key]; + if (LOG.isDebugEnabled()) { + LOG.debug("Sending " + value + " with id " + id); + } template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id); return null; } }); } - System.out.println("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); + LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); assertMockEndpointsSatisfied(); } Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java Thu Apr 8 07:22:39 2010 @@ -21,11 +21,14 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.junit.Before; import org.junit.Test; public class HawtDBAggregateLoadTest extends CamelTestSupport { + private static final Log LOG = LogFactory.getLog(HawtDBAggregateLoadTest.class); private static final int SIZE = 5000; @Before @@ -41,15 +44,18 @@ public class HawtDBAggregateLoadTest ext mock.expectedMinimumMessageCount(1); mock.setResultWaitTime(30 * 1000); - System.out.println("Staring to send " + SIZE + " messages."); + LOG.info("Staring to send " + SIZE + " messages."); for (int i = 0; i < SIZE; i++) { final int value = 1; char id = 'A'; + if (LOG.isDebugEnabled()) { + LOG.debug("Sending " + value + " with id " + id); + } template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id); } - System.out.println("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); + LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); assertMockEndpointsSatisfied(); } Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/SpringAntPathMatcherFileFilter.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/SpringAntPathMatcherFileFilter.java?rev=931804&r1=931803&r2=931804&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/SpringAntPathMatcherFileFilter.java (original) +++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/util/SpringAntPathMatcherFileFilter.java Thu Apr 8 07:22:39 2010 @@ -47,7 +47,7 @@ public class SpringAntPathMatcherFileFil * @return <tt>true</tt> if accepted, <tt>false</tt> if not */ public boolean acceptPathName(String path) { - // must use single / as path seperators + // must use single / as path separators path = StringUtils.replace(path, File.separator, "/"); if (LOG.isTraceEnabled()) {