Author: davsclaus Date: Tue Dec 14 18:42:19 2010 New Revision: 1049214 URL: http://svn.apache.org/viewvc?rev=1049214&view=rev Log: CAMEL-3427: ConsumerTemplate expose doneUoW you may need in special cases. Added javadoc why and when.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java?rev=1049214&r1=1049213&r2=1049214&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java Tue Dec 14 18:42:19 2010 @@ -26,7 +26,7 @@ package org.apache.camel; * This is <b>not</b> the <a href="http://camel.apache.org/event-driven-consumer.html">Event Driven Consumer EIP</a>. * <p/> * <b>All</b> methods throws {...@link RuntimeCamelException} if consuming of - * the {...@link Exchange} failed and an Exception occured. The <tt>getCause</tt> + * the {...@link Exchange} failed and an Exception occurred. The <tt>getCause</tt> * method on {...@link RuntimeCamelException} returns the wrapper original caused * exception. * <p/> @@ -75,6 +75,8 @@ public interface ConsumerTemplate extend /** * Receives from the endpoint, waiting until there is a response + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpointUri the endpoint to receive from * @return the returned exchange @@ -82,35 +84,46 @@ public interface ConsumerTemplate extend Exchange receive(String endpointUri); /** - * Receives from the endpoint, waiting until there is a response + * Receives from the endpoint, waiting until there is a response. + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpoint the endpoint to receive from * @return the returned exchange + * @see #doneUoW(Exchange) */ Exchange receive(Endpoint endpoint); /** * Receives from the endpoint, waiting until there is a response * or the timeout occurs + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpointUri the endpoint to receive from * @param timeout timeout in millis to wait for a response * @return the returned exchange, or <tt>null</tt> if no response + * @see #doneUoW(Exchange) */ Exchange receive(String endpointUri, long timeout); /** * Receives from the endpoint, waiting until there is a response * or the timeout occurs + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpoint the endpoint to receive from * @param timeout timeout in millis to wait for a response * @return the returned exchange, or <tt>null</tt> if no response + * @see #doneUoW(Exchange) */ Exchange receive(Endpoint endpoint, long timeout); /** * Receives from the endpoint, not waiting for a response if non exists. + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpointUri the endpoint to receive from * @return the returned exchange, or <tt>null</tt> if no response @@ -119,6 +132,8 @@ public interface ConsumerTemplate extend /** * Receives from the endpoint, not waiting for a response if non exists. + * <p/> + * <b>Important:</b> See {...@link #doneUoW(Exchange)} * * @param endpoint the endpoint to receive from * @return the returned exchange, or <tt>null</tt> if no response @@ -235,4 +250,19 @@ public interface ConsumerTemplate extend */ <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type); + /** + * If you have used any of the <tt>receive</tt> methods which returns a {...@link Exchange} type + * then you need to invoke this method when you are done using the returned {...@link Exchange}. + * <p/> + * This is needed to ensure any {...@link org.apache.camel.spi.Synchronization} works is being executed. + * For example if you consumed from a file endpoint, then the consumed file is only moved/delete when + * you done the {...@link Exchange}. + * <p/> + * Note for all the other <tt>receive</tt> methods which does <b>not</b> return a {...@link Exchange} type, + * the done has been executed automatic by Camel itself. + * + * @param exchange the exchange + */ + void doneUoW(Exchange exchange); + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=1049214&r1=1049213&r2=1049214&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Tue Dec 14 18:42:19 2010 @@ -191,6 +191,26 @@ public class DefaultConsumerTemplate ext return receiveBodyNoWait(endpoint.getEndpointUri(), type); } + public void doneUoW(Exchange exchange) { + try { + // The receiveBody method will get a null exchange + if (exchange == null) { + return; + } + if (exchange.getUnitOfWork() == null) { + // handover completions and done them manually to ensure they are being executed + List<Synchronization> synchronizations = exchange.handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); + } else { + // done the unit of work + exchange.getUnitOfWork().done(exchange); + } + } catch (Throwable e) { + LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange + + ". This exception will be ignored.", e); + } + } + protected Endpoint resolveMandatoryEndpoint(String endpointUri) { return CamelContextHelper.getMandatoryEndpoint(context, endpointUri); } @@ -224,26 +244,6 @@ public class DefaultConsumerTemplate ext return answer; } - private static void doneUoW(Exchange exchange) { - try { - // The receiveBody method will get a null exchange - if (exchange == null) { - return; - } - if (exchange.getUnitOfWork() == null) { - // handover completions and done them manually to ensure they are being executed - List<Synchronization> synchronizations = exchange.handoverCompletions(); - UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); - } else { - // done the unit of work - exchange.getUnitOfWork().done(exchange); - } - } catch (Throwable e) { - LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange - + ". This exception will be ignored.", e); - } - } - private ConsumerCache getConsumerCache() { if (!isStarted()) { throw new IllegalStateException("ConsumerTemplate has not been started"); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java?rev=1049214&r1=1049213&r2=1049214&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java Tue Dec 14 18:42:19 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.impl; +import java.io.File; + import org.apache.camel.ConsumerTemplate; import org.apache.camel.ContextTestSupport; import org.apache.camel.Endpoint; @@ -333,4 +335,22 @@ public class DefaultConsumerTemplateTest assertEquals("Size should be 0", 0, template.getCurrentCacheSize()); } + public void testDoneUoW() throws Exception { + deleteDirectory("target/foo"); + template.sendBodyAndHeader("file:target/foo", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + Exchange exchange = consumer.receive("file:target/foo?delete=true"); + assertNotNull(exchange); + assertEquals("Hello World", exchange.getIn().getBody(String.class)); + + // file should still exists + File file = new File("target/foo/hello.txt").getAbsoluteFile(); + assertTrue("File should exist " + file, file.exists()); + + // done the exchange + consumer.doneUoW(exchange); + + assertFalse("File should have been deleted " + file, file.exists()); + } + }