Author: davsclaus Date: Wed Apr 14 08:22:01 2010 New Revision: 933877 URL: http://svn.apache.org/viewvc?rev=933877&view=rev Log: CAMEL-2640: Fixed file consumer with recursive and noop enable would not pick files with similar name in sibling folders.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java (with props) camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=933877&r1=933876&r2=933877&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Apr 14 08:22:01 2010 @@ -62,8 +62,8 @@ public abstract class GenericFileConsume shutdownRunningTask = null; pendingExchanges = 0; - // before we poll is there anything we need to check ? Such as are we - // connected to the FTP Server Still ? + // before we poll is there anything we need to check? + // such as are we connected to the FTP Server still? if (!prePollCheck()) { if (log.isDebugEnabled()) { log.debug("Skipping pool as pre poll check returned false"); @@ -141,7 +141,7 @@ public abstract class GenericFileConsume while (exchanges.size() > 0) { Exchange exchange = (Exchange) exchanges.poll(); GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); - String key = file.getFileName(); + String key = file.getAbsoluteFilePath(); endpoint.getInProgressRepository().remove(key); } } @@ -215,6 +215,9 @@ public abstract class GenericFileConsume log.trace("Processing file: " + file); } + // must extract the absolute name before the begin strategy as the file could potentially be pre moved + // and then the file name would be changed + String absoluteFileName = file.getAbsoluteFilePath(); try { final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); @@ -224,7 +227,7 @@ public abstract class GenericFileConsume log.debug(endpoint + " cannot begin processing file: " + file); } // remove file from the in progress list as its no longer in progress - endpoint.getInProgressRepository().remove(file.getFileName()); + endpoint.getInProgressRepository().remove(absoluteFileName); return; } @@ -251,8 +254,7 @@ public abstract class GenericFileConsume // register on completion callback that does the completion strategies // (for instance to move the file after we have processed it) - String originalFileName = file.getFileName(); - exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, originalFileName)); + exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); // process the exchange getProcessor().process(exchange); @@ -263,10 +265,9 @@ public abstract class GenericFileConsume } /** - * Strategy for validating if the given remote file should be included or - * not + * Strategy for validating if the given remote file should be included or not * - * @param file the remote file + * @param file the file * @param isDirectory whether the file is a directory or a file * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it */ @@ -277,8 +278,7 @@ public abstract class GenericFileConsume log.trace("File did not match. Will skip this file: " + file); } return false; - } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) { - // only use the filename as the key as the file could be moved into a done folder + } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { if (log.isTraceEnabled()) { log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file); } @@ -359,7 +359,7 @@ public abstract class GenericFileConsume * @return <tt>true</tt> if the file is already in progress */ protected boolean isInProgress(GenericFile<T> file) { - String key = file.getFileName(); + String key = file.getAbsoluteFilePath(); return !endpoint.getInProgressRepository().add(key); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=933877&r1=933876&r2=933877&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Wed Apr 14 08:22:01 2010 @@ -38,14 +38,14 @@ public class GenericFileOnCompletion<T> private GenericFileOperations<T> operations; private ExceptionHandler exceptionHandler; private GenericFile<T> file; - private String originalFileName; + private String absoluteFileName; public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, - GenericFile<T> file, String originalFileName) { + GenericFile<T> file, String absoluteFileName) { this.endpoint = endpoint; this.operations = operations; this.file = file; - this.originalFileName = originalFileName; + this.absoluteFileName = absoluteFileName; } public void onComplete(Exchange exchange) { @@ -96,7 +96,7 @@ public class GenericFileOnCompletion<T> // remove file from the in progress list as its no longer in progress // use the original file name that was used to add it to the repository // as the name can be different when using preMove option - endpoint.getInProgressRepository().remove(originalFileName); + endpoint.getInProgressRepository().remove(absoluteFileName); } } @@ -111,8 +111,7 @@ public class GenericFileOnCompletion<T> Exchange exchange, GenericFile<T> file) { if (endpoint.isIdempotent()) { // only add to idempotent repository if we could process the file - // only use the filename as the key as the file could be moved into a done folder - endpoint.getIdempotentRepository().add(file.getFileName()); + endpoint.getIdempotentRepository().add(absoluteFileName); } try { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java?rev=933877&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java Wed Apr 14 08:22:01 2010 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class FileRecursiveNoopTest extends ContextTestSupport { + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/noop"); + super.setUp(); + } + + public void testRecursiveNoop() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2"); + + template.sendBodyAndHeader("file:target/noop", "a", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader("file:target/noop", "b", Exchange.FILE_NAME, "b.txt"); + template.sendBodyAndHeader("file:target/noop/foo", "a2", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader("file:target/noop/bar", "c", Exchange.FILE_NAME, "c.txt"); + template.sendBodyAndHeader("file:target/noop/bar", "b2", Exchange.FILE_NAME, "b.txt"); + + assertMockEndpointsSatisfied(); + + // reset mock and send in a new file to be picked up only + mock.reset(); + mock.expectedBodiesReceived("c2"); + + template.sendBodyAndHeader("file:target/noop", "c2", Exchange.FILE_NAME, "c.txt"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/noop?recursive=true&noop=true") + .convertBodyTo(String.class) + .to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?rev=933877&r1=933876&r2=933877&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java Wed Apr 14 08:22:01 2010 @@ -43,6 +43,7 @@ public class FileAsyncStressTest extends public void testAsyncStress() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(100); + mock.setResultWaitTime(20000); assertMockEndpointsSatisfied(); } @@ -61,7 +62,7 @@ public class FileAsyncStressTest extends public void process(Exchange exchange) throws Exception { // simulate some work with random time to complete Random ran = new Random(); - int delay = ran.nextInt(250) + 10; + int delay = ran.nextInt(50) + 10; Thread.sleep(delay); } }).to("mock:result"); Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java?rev=933877&view=auto ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java (added) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java Wed Apr 14 08:22:01 2010 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class FromFtpRecursiveNoopTest extends FtpServerTestSupport { + + private String getFtpUrl() { + return "ftp://ad...@localhost:" + getPort() + "/noop?password=admin&binary=false" + + "&recursive=true&noop=true"; + } + + @Test + public void testRecursiveNoop() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2"); + + template.sendBodyAndHeader(getFtpUrl(), "a", Exchange.FILE_NAME, "a.txt"); + template.sendBodyAndHeader(getFtpUrl(), "b", Exchange.FILE_NAME, "b.txt"); + template.sendBodyAndHeader(getFtpUrl(), "a2", Exchange.FILE_NAME, "foo/a.txt"); + template.sendBodyAndHeader(getFtpUrl(), "c", Exchange.FILE_NAME, "bar/c.txt"); + template.sendBodyAndHeader(getFtpUrl(), "b2", Exchange.FILE_NAME, "bar/b.txt"); + + assertMockEndpointsSatisfied(); + + // reset mock and send in a new file to be picked up only + mock.reset(); + mock.expectedBodiesReceived("c2"); + + template.sendBodyAndHeader(getFtpUrl(), "c2", Exchange.FILE_NAME, "c.txt"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(getFtpUrl()) + .convertBodyTo(String.class) + .to("mock:result"); + } + }; + } + + +} Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java?rev=933877&r1=933876&r2=933877&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java Wed Apr 14 08:22:01 2010 @@ -23,6 +23,8 @@ import org.apache.camel.ContextTestSuppo import org.apache.camel.Exchange; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.util.FileUtil; + import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; public class FileConsumerIdempotentTest extends ContextTestSupport { @@ -43,8 +45,6 @@ public class FileConsumerIdempotentTest public void testIdempotent() throws Exception { - assertFalse(repo.contains("report.txt")); - // send a file template.sendBodyAndHeader("file://target/fileidempotent/", "Hello World", Exchange.FILE_NAME, "report.txt"); @@ -69,7 +69,9 @@ public class FileConsumerIdempotentTest // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not assertMockEndpointsSatisfied(); - assertTrue(repo.contains("report.txt")); + + String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath()); + assertTrue("Should contain file: " + name, repo.contains(name)); } }