Repository: camel
Updated Branches:
  refs/heads/master d2e81819d -> 5d8a4b045


Added readLockMinAge parameter to file, camel-ftp components


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa04c078
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa04c078
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa04c078

Branch: refs/heads/master
Commit: fa04c0780234ed9ba857dd36d0a5b379170b7ae2
Parents: d2e8181
Author: Jyrki Ruuskanen <yur...@kotikone.fi>
Authored: Fri Feb 27 22:30:43 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Mar 4 07:18:27 2015 +0100

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     | 23 +++++-
 .../FileChangedExclusiveReadLockStrategy.java   | 15 +++-
 .../strategy/FileProcessStrategyFactory.java    |  4 +
 ...leChangedReadLockMinAgeShortCircuitTest.java | 74 +++++++++++++++++
 .../strategy/FileChangedReadLockMinAgeTest.java | 87 ++++++++++++++++++++
 .../FtpChangedExclusiveReadLockStrategy.java    | 16 +++-
 .../strategy/FtpProcessStrategyFactory.java     |  4 +
 .../SftpChangedExclusiveReadLockStrategy.java   | 15 +++-
 .../strategy/SftpProcessStrategyFactory.java    |  4 +
 ...tpChangedReadLockMinAgeShortCircuitTest.java | 80 ++++++++++++++++++
 .../remote/FtpChangedReadLockMinAgeTest.java    | 87 ++++++++++++++++++++
 11 files changed, 403 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 3eb1b53..5108fee 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -165,6 +165,8 @@ public abstract class GenericFileEndpoint<T> extends 
ScheduledPollEndpoint imple
     protected LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
     @UriParam(label = "consumer", defaultValue = "1")
     protected long readLockMinLength = 1;
+    @UriParam(label = "consumer", defaultValue = "0")
+    protected long readLockMinAge = 0;
     @UriParam(label = "consumer")
     protected GenericFileExclusiveReadLockStrategy<T> 
exclusiveReadLockStrategy;
 
@@ -1080,7 +1082,15 @@ public abstract class GenericFileEndpoint<T> extends 
ScheduledPollEndpoint imple
     public void setAllowNullBody(boolean allowNullBody) {
         this.allowNullBody = allowNullBody;
     }
-    
+
+    public long getReadLockMinAge() {
+        return readLockMinAge;
+    }
+
+    public void setReadLockMinAge(long readLockMinAge) {
+        this.readLockMinAge = readLockMinAge;
+    }
+
     /**
      * Configures the given message with the file which sets the body to the
      * file object.
@@ -1184,6 +1194,7 @@ public abstract class GenericFileEndpoint<T> extends 
ScheduledPollEndpoint imple
         params.put("readLockMarkerFile", readLockMarkerFile);
         params.put("readLockMinLength", readLockMinLength);
         params.put("readLockLoggingLevel", readLockLoggingLevel);
+        params.put("readLockMinAge", readLockMinAge);
 
         return params;
     }
@@ -1282,11 +1293,17 @@ public abstract class GenericFileEndpoint<T> extends 
ScheduledPollEndpoint imple
     protected void doStart() throws Exception {
         // validate that the read lock options is valid for the process 
strategy
         if (!"none".equals(readLock) && !"off".equals(readLock)) {
+            if (readLockTimeout > 0 && readLockMinAge > 0 && readLockTimeout 
<= readLockCheckInterval + readLockMinAge) {
+                throw new IllegalArgumentException("The option readLockTimeout 
must be higher than readLockCheckInterval + readLockMinAge"
+                    + ", was readLockTimeout=" + readLockTimeout + ", 
readLockCheckInterval+readLockMinAge=" + (readLockCheckInterval + 
readLockMinAge)
+                    + ". A good practice is to let the readLockTimeout be at 
least readLockMinAge + 2 times the readLockCheckInterval"
+                    + " to ensure that the read lock procedure has enough time 
to acquire the lock.");
+            }
             if (readLockTimeout > 0 && readLockTimeout <= 
readLockCheckInterval) {
                 throw new IllegalArgumentException("The option readLockTimeout 
must be higher than readLockCheckInterval"
                         + ", was readLockTimeout=" + readLockTimeout + ", 
readLockCheckInterval=" + readLockCheckInterval
-                        + ". A good practice is to let the readLockTimeout be 
at least 3 or more times higher than the readLockCheckInterval"
-                        + ", to ensure the read lock procedure has amble times 
to run several times checks during acquiring the lock.");
+                        + ". A good practice is to let the readLockTimeout be 
at least 3 times higher than the readLockCheckInterval"
+                        + " to ensure that the read lock procedure has enough 
time to acquire the lock.");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
index f471512..bfaf3da 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file.strategy;
 
 import java.io.File;
+import java.util.Date;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -38,6 +39,7 @@ public class FileChangedExclusiveReadLockStrategy extends 
MarkerFileExclusiveRea
     private long timeout;
     private long checkInterval = 1000;
     private long minLength = 1;
+    private long minAge = 0;
     private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
 
     @Override
@@ -55,6 +57,7 @@ public class FileChangedExclusiveReadLockStrategy extends 
MarkerFileExclusiveRea
         long lastModified = Long.MIN_VALUE;
         long length = Long.MIN_VALUE;
         StopWatch watch = new StopWatch();
+        long startTime = (new Date()).getTime();
 
         while (!exclusive) {
             // timeout check
@@ -70,11 +73,13 @@ public class FileChangedExclusiveReadLockStrategy extends 
MarkerFileExclusiveRea
 
             long newLastModified = target.lastModified();
             long newLength = target.length();
+            long newOlderThan = startTime + watch.taken() - minAge;
 
             LOG.trace("Previous last modified: {}, new last modified: {}", 
lastModified, newLastModified);
             LOG.trace("Previous length: {}, new length: {}", length, 
newLength);
+            LOG.trace("New older than threshold: {}", newOlderThan);
 
-            if (length >= minLength && (newLastModified == lastModified && 
newLength == length)) {
+            if (newLength >= minLength && ((minAge == 0 && newLastModified == 
lastModified && newLength == length) || (minAge != 0 && newLastModified < 
newOlderThan))) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -134,4 +139,12 @@ public class FileChangedExclusiveReadLockStrategy extends 
MarkerFileExclusiveRea
     public void setMinLength(long minLength) {
         this.minLength = minLength;
     }
+
+    public long getMinAge() {
+        return minAge;
+    }
+
+    public void setMinAge(long minAge) {
+        this.minAge = minAge;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 7fd3636..ed5bd4e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -122,6 +122,10 @@ public final class FileProcessStrategyFactory {
                 if (minLength != null) {
                     readLockStrategy.setMinLength(minLength);
                 }
+                Long minAge = (Long) params.get("readLockMinAge");
+                if (null != minAge) {
+                    readLockStrategy.setMinAge(minAge);
+                }
                 strategy = readLockStrategy;
             }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java
new file mode 100644
index 0000000..1f4b388
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.FileOutputStream;
+import java.util.Date;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version
+ */
+public class FileChangedReadLockMinAgeShortCircuitTest extends 
ContextTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileChangedReadLockMinAgeShortCircuitTest.class);
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/changed/");
+        createDirectory("target/changed/in");
+        writeFile();
+        Thread.sleep(1000);
+        super.setUp();
+    }
+
+    public void testChangedReadLockMinAge() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/changed/out/file.dat");
+        // We should get the file on the first poll
+        
mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isLessThan(new
 Date().getTime()+15000));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private void writeFile() throws Exception {
+        LOG.debug("Writing file...");
+
+        FileOutputStream fos = new 
FileOutputStream("target/changed/in/file.dat");
+        fos.write("Line".getBytes());
+        fos.flush();
+        fos.close();
+        LOG.debug("Writing file DONE...");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/changed/in?readLock=changed&readLockMinAge=500&readLockCheckInterval=30000&readLockTimeout=90000").to("file:target/changed/out",
 "mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java
new file mode 100644
index 0000000..2dffcc9
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Date;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version
+ */
+public class FileChangedReadLockMinAgeTest extends ContextTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileChangedReadLockMinAgeTest.class);
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/changed/");
+        createDirectory("target/changed/in");
+        super.setUp();
+    }
+
+    public void testChangedReadLockMinAge() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/changed/out/slowfile.dat");
+        // writing takes ~2 seconds, and then it has to age for at least 
minAge milliseconds (3 seconds)
+        
mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isGreaterThan(new
 Date().getTime()+5000));
+
+        writeSlowFile();
+
+        assertMockEndpointsSatisfied();
+
+        String content = context.getTypeConverter().convertTo(String.class, 
new File("target/changed/out/slowfile.dat"));
+        String[] lines = content.split(LS);
+        assertEquals("There should be 20 lines in the file", 20, lines.length);
+        for (int i = 0; i < 20; i++) {
+            assertEquals("Line " + i, lines[i]);
+        }
+    }
+
+    private void writeSlowFile() throws Exception {
+        LOG.debug("Writing slow file...");
+
+        FileOutputStream fos = new 
FileOutputStream("target/changed/in/slowfile.dat");
+        for (int i = 0; i < 20; i++) {
+            fos.write(("Line " + i + LS).getBytes());
+            LOG.debug("Writing line " + i);
+            Thread.sleep(100);
+        }
+
+        fos.flush();
+        fos.close();
+        LOG.debug("Writing slow file DONE...");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/changed/in?readLock=changed&readLockMinAge=3000").to("file:target/changed/out",
 "mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
index fdbf457..fb1b850 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.file.remote.strategy;
 
+import java.util.Date;
 import java.util.List;
 
 import org.apache.camel.Exchange;
@@ -36,6 +37,7 @@ public class FtpChangedExclusiveReadLockStrategy implements 
GenericFileExclusive
     private long checkInterval = 5000;
     private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
     private long minLength = 1;
+    private long minAge = 0;
     private boolean fastExistsCheck;
 
     @Override
@@ -51,6 +53,7 @@ public class FtpChangedExclusiveReadLockStrategy implements 
GenericFileExclusive
         long lastModified = Long.MIN_VALUE;
         long length = Long.MIN_VALUE;
         StopWatch watch = new StopWatch();
+        long startTime = (new Date()).getTime();
 
         while (!exclusive) {
             // timeout check
@@ -66,6 +69,7 @@ public class FtpChangedExclusiveReadLockStrategy implements 
GenericFileExclusive
 
             long newLastModified = 0;
             long newLength = 0;
+
             List<FTPFile> files;
             if (fastExistsCheck) {
                 // use the absolute file path to only pickup the file we want 
to check, this avoids expensive
@@ -89,8 +93,10 @@ public class FtpChangedExclusiveReadLockStrategy implements 
GenericFileExclusive
 
             LOG.trace("Previous last modified: " + lastModified + ", new last 
modified: " + newLastModified);
             LOG.trace("Previous length: " + length + ", new length: " + 
newLength);
+            long newOlderThan = startTime + watch.taken() - minAge;
+            LOG.trace("New older than threshold: {}", newOlderThan);
 
-            if (length >= minLength && (newLastModified == lastModified && 
newLength == length)) {
+            if (newLength >= minLength && ((minAge == 0 && newLastModified == 
lastModified && newLength == length) || (minAge != 0 && newLastModified < 
newOlderThan))) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -161,6 +167,14 @@ public class FtpChangedExclusiveReadLockStrategy 
implements GenericFileExclusive
         this.minLength = minLength;
     }
 
+    public long getMinAge() {
+        return minAge;
+    }
+
+    public void setMinAge(long minAge) {
+        this.minAge = minAge;
+    }
+
     public boolean isFastExistsCheck() {
         return fastExistsCheck;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
index ac0977e..55ba556 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
@@ -130,6 +130,10 @@ public final class FtpProcessStrategyFactory {
                 if (minLength != null) {
                     readLockStrategy.setMinLength(minLength);
                 }
+                Long minAge = (Long) params.get("readLockMinAge");
+                if (null != minAge) {
+                    readLockStrategy.setMinAge(minAge);
+                }
                 Boolean fastExistsCheck = (Boolean) 
params.get("fastExistsCheck");
                 if (fastExistsCheck != null) {
                     readLockStrategy.setFastExistsCheck(fastExistsCheck);

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
index 715b3e6..8100811 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.file.remote.strategy;
 
+import java.util.Date;
 import java.util.List;
 
 import com.jcraft.jsch.ChannelSftp;
@@ -36,6 +37,7 @@ public class SftpChangedExclusiveReadLockStrategy implements 
GenericFileExclusiv
     private long checkInterval = 5000;
     private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
     private long minLength = 1;
+    private long minAge = 0;
     private boolean fastExistsCheck;
 
     @Override
@@ -51,6 +53,7 @@ public class SftpChangedExclusiveReadLockStrategy implements 
GenericFileExclusiv
         long lastModified = Long.MIN_VALUE;
         long length = Long.MIN_VALUE;
         StopWatch watch = new StopWatch();
+        long startTime = (new Date()).getTime();
 
         while (!exclusive) {
             // timeout check
@@ -87,8 +90,10 @@ public class SftpChangedExclusiveReadLockStrategy implements 
GenericFileExclusiv
 
             LOG.trace("Previous last modified: " + lastModified + ", new last 
modified: " + newLastModified);
             LOG.trace("Previous length: " + length + ", new length: " + 
newLength);
+            long newOlderThan = startTime + watch.taken() - minAge;
+            LOG.trace("New older than threshold: {}", newOlderThan);
 
-            if (length >= minLength && (newLastModified == lastModified && 
newLength == length)) {
+            if (newLength >= minLength && ((minAge == 0 && newLastModified == 
lastModified && newLength == length) || (minAge != 0 && newLastModified < 
newOlderThan))) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -154,6 +159,14 @@ public class SftpChangedExclusiveReadLockStrategy 
implements GenericFileExclusiv
         this.minLength = minLength;
     }
 
+    public long getMinAge() {
+        return minAge;
+    }
+
+    public void setMinAge(long minAge) {
+        this.minAge = minAge;
+    }
+
     public boolean isFastExistsCheck() {
         return fastExistsCheck;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
index d199ac2..aabccb4 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
@@ -126,6 +126,10 @@ public final class SftpProcessStrategyFactory {
                 if (minLength != null) {
                     readLockStrategy.setMinLength(minLength);
                 }
+                Long minAge = (Long) params.get("readLockMinAge");
+                if (null != minAge) {
+                    readLockStrategy.setMinAge(minAge);
+                }
                 Boolean fastExistsCheck = (Boolean) 
params.get("fastExistsCheck");
                 if (fastExistsCheck != null) {
                     readLockStrategy.setFastExistsCheck(fastExistsCheck);

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java
new file mode 100644
index 0000000..4ce6492
--- /dev/null
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Date;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FtpChangedReadLockMinAgeShortCircuitTest extends 
FtpServerTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FtpChangedReadLockMinAgeShortCircuitTest.class);
+
+    protected String getFtpUrl() {
+        return "ftp://admin@localhost:"; + getPort() + 
"/changed?password=admin&readLock=changed&readLockMinAge=500&readLockCheckInterval=30000&readLockTimeout=90000&delete=true";
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        writeFile();
+        Thread.sleep(1000);
+    }
+
+    @Test
+    public void testChangedReadLock() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/changed/out/slowfile.dat");
+        // We should get the file on the first poll
+        
mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isLessThan(new
 Date().getTime() + 15000));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private void writeFile() throws Exception {
+        LOG.debug("Writing file...");
+
+        createDirectory(FTP_ROOT_DIR + "/changed");
+        FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + 
"/changed/slowfile.dat", true);
+        fos.write("Line".getBytes());
+        fos.flush();
+        fos.close();
+        LOG.debug("Writing file DONE...");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("file:target/changed/out", "mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java
new file mode 100644
index 0000000..28b1f95
--- /dev/null
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Date;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FtpChangedReadLockMinAgeTest extends FtpServerTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FtpChangedReadLockMinAgeTest.class);
+
+    protected String getFtpUrl() {
+        return "ftp://admin@localhost:"; + getPort() + 
"/changed?password=admin&readLock=changed&readLockCheckInterval=1000&readLockMinAge=3000&delete=true";
+    }
+
+    @Test
+    public void testChangedReadLock() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/changed/out/slowfile.dat");
+        // writing takes ~2 seconds, and then it has to age for at least 
minAge milliseconds (3 seconds)
+        
mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isGreaterThan(new
 Date().getTime() + 5000));
+
+        writeSlowFile();
+
+        assertMockEndpointsSatisfied();
+
+        String content = context.getTypeConverter().convertTo(String.class, 
new File("target/changed/out/slowfile.dat"));
+        String[] lines = content.split(LS);
+        assertEquals("There should be 20 lines in the file", 20, lines.length);
+        for (int i = 0; i < 20; i++) {
+            assertEquals("Line " + i, lines[i]);
+        }
+    }
+
+    private void writeSlowFile() throws Exception {
+        LOG.debug("Writing slow file...");
+
+        createDirectory(FTP_ROOT_DIR + "/changed");
+        FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + 
"/changed/slowfile.dat", true);
+        for (int i = 0; i < 20; i++) {
+            fos.write(("Line " + i + LS).getBytes());
+            LOG.debug("Writing line " + i);
+            Thread.sleep(100);
+        }
+
+        fos.flush();
+        fos.close();
+        LOG.debug("Writing slow file DONE...");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("file:target/changed/out", "mock:result");
+            }
+        };
+    }
+
+}

Reply via email to