Author: davsclaus Date: Fri Jun 10 10:01:12 2011 New Revision: 1134260 URL: http://svn.apache.org/viewvc?rev=1134260&view=rev Log: CAMEL-4076: Fixed idempotent consumer not being started/stopped when used on file/ftp endpoints.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java - copied, changed from r1134236, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java - copied, changed from r1134236, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1134260&r1=1134259&r2=1134260&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Fri Jun 10 10:01:12 2011 @@ -39,6 +39,7 @@ import org.apache.camel.spi.IdempotentRe import org.apache.camel.spi.Language; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.StringHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -720,4 +721,15 @@ public abstract class GenericFileEndpoin } } + @Override + protected void doStart() throws Exception { + ServiceHelper.startServices(inProgressRepository, idempotentRepository); + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + ServiceHelper.stopServices(inProgressRepository, idempotentRepository); + } } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java (from r1134236, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&r1=1134236&r2=1134260&rev=1134260&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java Fri Jun 10 10:01:12 2011 @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.io.File; +import java.io.FileOutputStream; import org.apache.camel.ContextTestSupport; import org.apache.camel.Endpoint; @@ -31,13 +32,13 @@ import org.apache.camel.spi.IdempotentRe /** * @version */ -public class FileIdempotentConsumerTest extends ContextTestSupport { +public class FileIdempotentConsumerLoadStoreTest extends ContextTestSupport { protected Endpoint startEndpoint; protected MockEndpoint resultEndpoint; private File store = new File("target/idempotentfilestore.dat"); private IdempotentRepository<String> repo; - public void testDuplicateMessagesAreFilteredOut() throws Exception { + public void testLoadStore() throws Exception { assertFalse(repo.contains("1")); assertFalse(repo.contains("2")); assertFalse(repo.contains("3")); @@ -79,12 +80,15 @@ public class FileIdempotentConsumerTest store.delete(); } - repo = FileIdempotentRepository.fileIdempotentRepository(store); + // insert existing to store + FileOutputStream fos = new FileOutputStream(store); + fos.write("4\n".getBytes()); + fos.close(); - // let's add 4 to start with - repo.add("4"); + repo = FileIdempotentRepository.fileIdempotentRepository(store); super.setUp(); + startEndpoint = resolveMandatoryEndpoint("direct:start"); resultEndpoint = getMockEndpoint("mock:result"); } Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java (from r1134236, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java&r1=1134236&r2=1134260&rev=1134260&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java Fri Jun 10 10:01:12 2011 @@ -17,6 +17,7 @@ package org.apache.camel.spring.processor.idempotent; import java.io.File; +import java.io.FileOutputStream; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; @@ -27,7 +28,7 @@ import org.apache.camel.util.FileUtil; import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; -public class FileConsumerIdempotentTest extends ContextTestSupport { +public class FileConsumerIdempotentLoadStoreTest extends ContextTestSupport { private IdempotentRepository<String> repo; @@ -39,40 +40,42 @@ public class FileConsumerIdempotentTest @Override protected void setUp() throws Exception { deleteDirectory("target/fileidempotent"); + createDirectory("target/fileidempotent"); + + File file = new File("target/fileidempotent/.filestore.dat"); + FileOutputStream fos = new FileOutputStream(file); + + // insert existing name to the file repo, so we should skip this file + String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath()); + fos.write(name.getBytes()); + fos.write("\n".getBytes()); + + fos.close(); super.setUp(); + + // add a file to the repo repo = context.getRegistry().lookup("fileStore", IdempotentRepository.class); } - - public void testIdempotent() throws Exception { - // send a file + public void testIdempotentLoad() throws Exception { + // send two files (report.txt exists already in idempotent repo) template.sendBodyAndHeader("file://target/fileidempotent/", "Hello World", Exchange.FILE_NAME, "report.txt"); + template.sendBodyAndHeader("file://target/fileidempotent/", "Bye World", Exchange.FILE_NAME, "report2.txt"); // consume the file the first time MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(1); + mock.expectedBodiesReceived("Bye World"); assertMockEndpointsSatisfied(); - - // reset mock and set new expectations - mock.reset(); - mock.expectedMessageCount(0); - - // move file back - File file = new File("target/fileidempotent/done/report.txt"); - File renamed = new File("target/fileidempotent/report.txt"); - file = file.getAbsoluteFile(); - file.renameTo(renamed.getAbsoluteFile()); - - // sleep to let the consumer try to poll the file - Thread.sleep(2000); - - // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not - assertMockEndpointsSatisfied(); + // wait for the exchange to be done, as it only append to idempotent repo after success + oneExchangeDone.matchesMockWaitTime(); String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath()); assertTrue("Should contain file: " + name, repo.contains(name)); + + String name2 = FileUtil.normalizePath(new File("target/fileidempotent/report2.txt").getAbsolutePath()); + assertTrue("Should contain file: " + name2, repo.contains(name2)); } }