Author: davsclaus Date: Sat Jan 26 13:05:18 2013 New Revision: 1438888 URL: http://svn.apache.org/viewvc?rev=1438888&view=rev Log: CAMEL-6006: File/ftp consumer allow to customize idempotent repository key.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyTest.java - copied, changed from r1438865, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1438888&r1=1438887&r2=1438888&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Jan 26 13:05:18 2013 @@ -419,9 +419,17 @@ public abstract class GenericFileConsume if (!isMatched(file, isDirectory, files)) { log.trace("File did not match. Will skip this file: {}", file); return false; - } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { - log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); - return false; + } else if (endpoint.isIdempotent()) { + // use absolute file path as default key, but evaluate if an expression key was configured + String key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + if (key != null && endpoint.getIdempotentRepository().contains(key)) { + log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); + return false; + } } // file matched Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1438888&r1=1438887&r2=1438888&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Sat Jan 26 13:05:18 2013 @@ -85,6 +85,7 @@ public abstract class GenericFileEndpoin protected Expression preMove; protected Expression moveExisting; protected Boolean idempotent; + protected Expression idempotentKey; protected IdempotentRepository<String> idempotentRepository; protected GenericFileFilter<T> filter; protected AntPathMatcherGenericFileFilter<T> antFilter; @@ -423,6 +424,18 @@ public abstract class GenericFileEndpoin this.idempotent = idempotent; } + public Expression getIdempotentKey() { + return idempotentKey; + } + + public void setIdempotentKey(Expression idempotentKey) { + this.idempotentKey = idempotentKey; + } + + public void setIdempotentKey(String expression) { + this.idempotentKey = createFileLanguageExpression(expression); + } + public IdempotentRepository<String> getIdempotentRepository() { return idempotentRepository; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=1438888&r1=1438887&r2=1438888&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Sat Jan 26 13:05:18 2013 @@ -105,8 +105,18 @@ public class GenericFileOnCompletion<T> protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy, Exchange exchange, GenericFile<T> file) { if (endpoint.isIdempotent()) { + + // use absolute file path as default key, but evaluate if an expression key was configured + String key = absoluteFileName; + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + // only add to idempotent repository if we could process the file - endpoint.getIdempotentRepository().add(absoluteFileName); + if (key != null) { + endpoint.getIdempotentRepository().add(key); + } } // must be last in batch to delete the done file name Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java?rev=1438888&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java Sat Jan 26 13:05:18 2013 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for the idempotentKey option. + */ +public class FileConsumerIdempotentKeyNameAndSizeTest extends FileConsumerIdempotentTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("file://target/idempotent/?idempotent=true&idempotentKey=${file:onlyname}-${file:size}&move=done/${file:name}&delay=10") + .convertBodyTo(String.class).to("mock:result"); + } + }; + } + + public void testIdempotentDiffSize() throws Exception { + // consume the file the first time + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + assertMockEndpointsSatisfied(); + + oneExchangeDone.matchesMockWaitTime(); + + // reset mock and set new expectations + mock.reset(); + mock.expectedBodiesReceived("Bye World"); + + // create new file which has different length + template.sendBodyAndHeader("file://target/idempotent", "Bye World", Exchange.FILE_NAME, "report.txt"); + + assertMockEndpointsSatisfied(); + } + + +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyTest.java (from r1438865, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java&r1=1438865&r2=1438888&rev=1438888&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyTest.java Sat Jan 26 13:05:18 2013 @@ -16,56 +16,21 @@ */ package org.apache.camel.component.file; -import java.io.File; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; /** - * Unit test for the idempotent=true option. + * Unit test for the idempotentKey option. */ -public class FileConsumerIdempotentTest extends ContextTestSupport { - - @Override - protected void setUp() throws Exception { - deleteDirectory("target/idempotent"); - super.setUp(); - template.sendBodyAndHeader("file://target/idempotent", "Hello World", Exchange.FILE_NAME, "report.txt"); - } +public class FileConsumerIdempotentKeyTest extends FileConsumerIdempotentTest { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - from("file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10") + from("file://target/idempotent/?idempotent=true&idempotentKey=${file:onlyname}&move=done/${file:name}&delay=10") .convertBodyTo(String.class).to("mock:result"); } }; } - public void testIdempotent() throws Exception { - // consume the file the first time - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("Hello World"); - - assertMockEndpointsSatisfied(); - - oneExchangeDone.matchesMockWaitTime(); - - // reset mock and set new expectations - mock.reset(); - mock.expectedMessageCount(0); - - // move file back - File file = new File("target/idempotent/done/report.txt"); - File renamed = new File("target/idempotent/report.txt"); - file.renameTo(renamed); - - // should NOT consume the file again, let a bit time pass to let the consumer try to consume it but it should not - Thread.sleep(100); - assertMockEndpointsSatisfied(); - } - } \ No newline at end of file