Author: davsclaus Date: Mon Apr 5 08:12:11 2010 New Revision: 930826 URL: http://svn.apache.org/viewvc?rev=930826&view=rev Log: CAMEL-2568: Polished code a bit. Added recover over SEDA test. Fixed CS.
Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithSedaTest.java - copied, changed from r930692, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Mon Apr 5 08:12:11 2010 @@ -59,7 +59,7 @@ public class SedaComponent extends Defau protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1); boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); - if ((limitConcurrentConsumers) && (consumers > maxConcurrentConsumers)) { + if (limitConcurrentConsumers && consumers > maxConcurrentConsumers) { throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " + maxConcurrentConsumers + " was " + consumers); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Apr 5 08:12:11 2010 @@ -523,7 +523,7 @@ public abstract class ProcessorDefinitio * * @param uri the endpoint to send to * @return the builder - * @see org.apache.camel.SendAsyncProcessor + * @see org.apache.camel.processor.SendAsyncProcessor */ public ToDefinition toAsync(String uri) { ToDefinition answer = new ToDefinition(uri); @@ -541,7 +541,7 @@ public abstract class ProcessorDefinitio * @param uri the endpoint to send to * @param poolSize the core pool size * @return the builder - * @see org.apache.camel.SendAsyncProcessor + * @see org.apache.camel.processor.SendAsyncProcessor */ public ToDefinition toAsync(String uri, int poolSize) { ToDefinition answer = new ToDefinition(uri); @@ -559,7 +559,7 @@ public abstract class ProcessorDefinitio * * @param endpoint the endpoint to send to * @return the builder - * @see org.apache.camel.SendAsyncProcessor + * @see org.apache.camel.processor.SendAsyncProcessor */ public ToDefinition toAsync(Endpoint endpoint) { ToDefinition answer = new ToDefinition(endpoint); @@ -577,7 +577,7 @@ public abstract class ProcessorDefinitio * @param endpoint the endpoint to send to * @param poolSize the core pool size * @return the builder - * @see org.apache.camel.SendAsyncProcessor + * @see org.apache.camel.processor.SendAsyncProcessor */ public ToDefinition toAsync(Endpoint endpoint, int poolSize) { ToDefinition answer = new ToDefinition(endpoint); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Apr 5 08:12:11 2010 @@ -80,10 +80,9 @@ public class AggregateProcessor extends private final ExecutorService executorService; private ScheduledExecutorService recoverService; private TimeoutMap<Object, Exchange> timeoutMap; - private ExceptionHandler exceptionHandler; + private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass()); private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository(); private Map<Object, Object> closedCorrelationKeys; - private final AggregateOnCompletion aggregateOnCompletion = new AggregateOnCompletion(); private final Set<String> inProgressCompleteExchanges = new HashSet<String>(); private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); @@ -335,9 +334,12 @@ public class AggregateProcessor extends // send this exchange executorService.submit(new Runnable() { public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing aggregated exchange: " + exchange); + } // add on completion task so we remember to update the inProgressCompleteExchanges - exchange.addOnCompletion(aggregateOnCompletion); + exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); try { processor.process(exchange); @@ -348,11 +350,8 @@ public class AggregateProcessor extends exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t)); } - // was it good or bad? - if (exchange.getException() == null) { - // only confirm if we processed without a problem - aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); - } else { + // log exception if there was a problem + if (exchange.getException() != null) { // if there was an exception then let the exception handler handle it getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); } @@ -433,9 +432,6 @@ public class AggregateProcessor extends } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } @@ -462,19 +458,30 @@ public class AggregateProcessor extends /** * On completion task which keeps the booking of the in progress up to date */ - private class AggregateOnCompletion implements Synchronization { + private final class AggregateOnCompletion implements Synchronization { + private final String exchangeId; + + private AggregateOnCompletion(String exchangeId) { + // must use the original exchange id as it could potentially change if send over SEDA etc. + this.exchangeId = exchangeId; + } public void onFailure(Exchange exchange) { // must remember to remove in progress when we failed - inProgressCompleteExchanges.remove(exchange.getExchangeId()); + inProgressCompleteExchanges.remove(exchangeId); // do not remove redelivery state as we need it when we redeliver again later } public void onComplete(Exchange exchange) { - // must remember to remove in progress when we are complete - inProgressCompleteExchanges.remove(exchange.getExchangeId()); - // and remove redelivery state as well - redeliveryState.remove(exchange.getExchangeId()); + // only confirm if we processed without a problem + try { + aggregationRepository.confirm(exchange.getContext(), exchangeId); + // and remove redelivery state as well + redeliveryState.remove(exchangeId); + } finally { + // must remember to remove in progress when we are complete + inProgressCompleteExchanges.remove(exchangeId); + } } @Override @@ -599,14 +606,15 @@ public class AggregateProcessor extends if (recoverable.isUseRecovery()) { long interval = recoverable.getRecoveryIntervalInMillis(); if (interval <= 0) { - throw new IllegalArgumentException("AggregationRepository has recovery enabled and the CheckInterval option must be a positive number, was: " + interval); + throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); } // create a background recover thread to check every interval recoverService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); Runnable recoverTask = new RecoverTask(recoverable); LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); - recoverService.scheduleAtFixedRate(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); + // use fixed delay so there is X interval between each run + recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultUnboundedQueueSizeTest.java Mon Apr 5 08:12:11 2010 @@ -49,8 +49,7 @@ public class SedaDefaultUnboundedQueueSi template.sendBody("seda:foo", "Message overflow"); fail("Should thrown an exception"); } catch (Exception e) { - IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause()); - assertEquals("Queue full", ise.getMessage()); + assertIsInstanceOf(IllegalStateException.class, e.getCause()); } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownRouteGracefulWithTimerTest.java Mon Apr 5 08:12:11 2010 @@ -29,6 +29,9 @@ public class ShutdownRouteGracefulWithTi private static String foo = ""; public void testShutdownRouteGraceful() throws Exception { + // use a bit longer timer + context.getShutdownStrategy().setTimeout(20); + getMockEndpoint("mock:foo").expectedMessageCount(1); // should be stopped before it fires the first one getMockEndpoint("mock:timer").expectedMessageCount(0); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java Mon Apr 5 08:12:11 2010 @@ -68,6 +68,9 @@ public class ManagedThreadPoolTest exten template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); + // wait a bit to ensure JMX have updated values + Thread.sleep(2000); + poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize"); assertEquals(1, poolSize.intValue()); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java Mon Apr 5 08:12:11 2010 @@ -83,7 +83,7 @@ public class ManagedThrottlerTest extend Long total = (Long) mbeanServer.getAttribute(routeName, "TotalProcessingTime"); - assertTrue("Should take at most 1.5 sec: was " + total, total < 1500); + assertTrue("Should take at most 2.0 sec: was " + total, total < 2000); // change the throttler using JMX mbeanServer.setAttribute(throttlerName, new Attribute("MaximumRequestsPerPeriod", (long) 2)); Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java Mon Apr 5 08:12:11 2010 @@ -32,7 +32,7 @@ import org.junit.Test; public class HawtDBAggregateLoadAndRecoverTest extends CamelTestSupport { private static final int SIZE = 1000; - private static final AtomicInteger counter = new AtomicInteger(); + private static AtomicInteger counter = new AtomicInteger(); @Before @Override Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Mon Apr 5 08:12:11 2010 @@ -28,8 +28,8 @@ import org.junit.Test; public class HawtDBAggregateRecoverTest extends CamelTestSupport { - private HawtDBAggregationRepository<String> repo; private static AtomicInteger counter = new AtomicInteger(0); + private HawtDBAggregationRepository<String> repo; @Override public void setUp() throws Exception { @@ -38,7 +38,7 @@ public class HawtDBAggregateRecoverTest // enable recovery repo.setUseRecovery(true); // check faster - repo.setRecoveryInterval(1, TimeUnit.SECONDS); + repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS); super.setUp(); } @@ -71,7 +71,7 @@ public class HawtDBAggregateRecoverTest .completionSize(5).aggregationRepository(repo) .log("aggregated exchange id ${exchangeId} with ${body}") .to("mock:aggregated") - .delay(2000) + .delay(1000) // simulate errors the first two times .process(new Processor() { public void process(Exchange exchange) throws Exception { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java Mon Apr 5 08:12:11 2010 @@ -28,8 +28,8 @@ import org.junit.Test; public class HawtDBAggregateRecoverWithRedeliveryPolicyTest extends CamelTestSupport { - private HawtDBAggregationRepository<String> repo; private static AtomicInteger counter = new AtomicInteger(0); + private HawtDBAggregationRepository<String> repo; @Override public void setUp() throws Exception { @@ -38,7 +38,7 @@ public class HawtDBAggregateRecoverWithR // enable recovery repo.setUseRecovery(true); // check faster - repo.setRecoveryInterval(1, TimeUnit.SECONDS); + repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS); super.setUp(); } Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithSedaTest.java (from r930692, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithSedaTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithSedaTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=930692&r2=930826&rev=930826&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithSedaTest.java Mon Apr 5 08:12:11 2010 @@ -26,10 +26,10 @@ import org.apache.camel.processor.aggreg import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HawtDBAggregateRecoverTest extends CamelTestSupport { +public class HawtDBAggregateRecoverWithSedaTest extends CamelTestSupport { - private HawtDBAggregationRepository<String> repo; private static AtomicInteger counter = new AtomicInteger(0); + private HawtDBAggregationRepository<String> repo; @Override public void setUp() throws Exception { @@ -38,12 +38,12 @@ public class HawtDBAggregateRecoverTest // enable recovery repo.setUseRecovery(true); // check faster - repo.setRecoveryInterval(1, TimeUnit.SECONDS); + repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS); super.setUp(); } @Test - public void testHawtDBAggregateRecover() throws Exception { + public void testHawtDBAggregateRecoverWithSeda() throws Exception { // should fail the first 2 times and then recover getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); @@ -71,18 +71,23 @@ public class HawtDBAggregateRecoverTest .completionSize(5).aggregationRepository(repo) .log("aggregated exchange id ${exchangeId} with ${body}") .to("mock:aggregated") - .delay(2000) - // simulate errors the first two times - .process(new Processor() { - public void process(Exchange exchange) throws Exception { - int count = counter.incrementAndGet(); - if (count <= 2) { - throw new IllegalArgumentException("Damn"); - } - } - }) - .to("mock:result") + .to("seda:foo") .end(); + + // should be able to recover when we send over SEDA as its a OnCompletion + // which confirms the exchange when its complete. + from("seda:foo") + .delay(1000) + // simulate errors the first two times + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + int count = counter.incrementAndGet(); + if (count <= 2) { + throw new IllegalArgumentException("Damn"); + } + } + }) + .to("mock:result"); } }; } Modified: camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml?rev=930826&r1=930825&r2=930826&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml (original) +++ camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml Mon Apr 5 08:12:11 2010 @@ -30,8 +30,8 @@ <property name="persistentFileName" value="target/data/hawtdb.dat"/> <!-- and use repo2 as the repository name --> <property name="repositoryName" value="repo2"/> - <!-- scan every second --> - <property name="recoveryInterval" value="1000"/> + <!-- scan 2 times per second --> + <property name="recoveryInterval" value="500"/> <!-- enable recovery --> <property name="useRecovery" value="true"/> </bean>