Author: davsclaus
Date: Fri Jun 10 10:01:12 2011
New Revision: 1134260

URL: http://svn.apache.org/viewvc?rev=1134260&view=rev
Log:
CAMEL-4076: Fixed idempotent consumer not being started/stopped when used on 
file/ftp endpoints.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java
      - copied, changed from r1134236, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java
      - copied, changed from r1134236, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1134260&r1=1134259&r2=1134260&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 Fri Jun 10 10:01:12 2011
@@ -39,6 +39,7 @@ import org.apache.camel.spi.IdempotentRe
 import org.apache.camel.spi.Language;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StringHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -720,4 +721,15 @@ public abstract class GenericFileEndpoin
         }
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(inProgressRepository, 
idempotentRepository);
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        ServiceHelper.stopServices(inProgressRepository, idempotentRepository);
+    }
 }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java
 (from r1134236, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&r1=1134236&r2=1134260&rev=1134260&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerLoadStoreTest.java
 Fri Jun 10 10:01:12 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.io.File;
+import java.io.FileOutputStream;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
@@ -31,13 +32,13 @@ import org.apache.camel.spi.IdempotentRe
 /**
  * @version 
  */
-public class FileIdempotentConsumerTest extends ContextTestSupport {
+public class FileIdempotentConsumerLoadStoreTest extends ContextTestSupport {
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
     private File store = new File("target/idempotentfilestore.dat");
     private IdempotentRepository<String> repo;
 
-    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+    public void testLoadStore() throws Exception {
         assertFalse(repo.contains("1"));
         assertFalse(repo.contains("2"));
         assertFalse(repo.contains("3"));
@@ -79,12 +80,15 @@ public class FileIdempotentConsumerTest 
             store.delete();
         }
 
-        repo = FileIdempotentRepository.fileIdempotentRepository(store);
+        // insert existing to store
+        FileOutputStream fos = new FileOutputStream(store);
+        fos.write("4\n".getBytes());
+        fos.close();
 
-        // let's add 4 to start with
-        repo.add("4");
+        repo = FileIdempotentRepository.fileIdempotentRepository(store);
 
         super.setUp();
+
         startEndpoint = resolveMandatoryEndpoint("direct:start");
         resultEndpoint = getMockEndpoint("mock:result");
     }

Copied: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java
 (from r1134236, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java&r1=1134236&r2=1134260&rev=1134260&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentLoadStoreTest.java
 Fri Jun 10 10:01:12 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.spring.processor.idempotent;
 
 import java.io.File;
+import java.io.FileOutputStream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
@@ -27,7 +28,7 @@ import org.apache.camel.util.FileUtil;
 
 import static 
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
-public class FileConsumerIdempotentTest extends ContextTestSupport {
+public class FileConsumerIdempotentLoadStoreTest extends ContextTestSupport {
 
     private IdempotentRepository<String> repo;
 
@@ -39,40 +40,42 @@ public class FileConsumerIdempotentTest 
     @Override
     protected void setUp() throws Exception {
         deleteDirectory("target/fileidempotent");
+        createDirectory("target/fileidempotent");
+
+        File file = new File("target/fileidempotent/.filestore.dat");
+        FileOutputStream fos = new FileOutputStream(file);
+
+        // insert existing name to the file repo, so we should skip this file
+        String name = FileUtil.normalizePath(new 
File("target/fileidempotent/report.txt").getAbsolutePath());
+        fos.write(name.getBytes());
+        fos.write("\n".getBytes());
+
+        fos.close();
 
         super.setUp();
+
+        // add a file to the repo
         repo = context.getRegistry().lookup("fileStore", 
IdempotentRepository.class);
     }
 
-
-    public void testIdempotent() throws Exception {
-        // send a file
+    public void testIdempotentLoad() throws Exception {
+        // send two files (report.txt exists already in idempotent repo)
         template.sendBodyAndHeader("file://target/fileidempotent/", "Hello 
World", Exchange.FILE_NAME, "report.txt");
+        template.sendBodyAndHeader("file://target/fileidempotent/", "Bye 
World", Exchange.FILE_NAME, "report2.txt");
 
         // consume the file the first time
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("Bye World");
 
         assertMockEndpointsSatisfied();
-
-        // reset mock and set new expectations
-        mock.reset();
-        mock.expectedMessageCount(0);
-
-        // move file back
-        File file = new File("target/fileidempotent/done/report.txt");
-        File renamed = new File("target/fileidempotent/report.txt");
-        file = file.getAbsoluteFile();
-        file.renameTo(renamed.getAbsoluteFile());
-
-        // sleep to let the consumer try to poll the file
-        Thread.sleep(2000);
-
-        // should NOT consume the file again, let 2 secs pass to let the 
consumer try to consume it but it should not
-        assertMockEndpointsSatisfied();
+        // wait for the exchange to be done, as it only append to idempotent 
repo after success
+        oneExchangeDone.matchesMockWaitTime();
 
         String name = FileUtil.normalizePath(new 
File("target/fileidempotent/report.txt").getAbsolutePath());
         assertTrue("Should contain file: " + name, repo.contains(name));
+
+        String name2 = FileUtil.normalizePath(new 
File("target/fileidempotent/report2.txt").getAbsolutePath());
+        assertTrue("Should contain file: " + name2, repo.contains(name2));
     }
 
 }


Reply via email to