Repository: camel Updated Branches: refs/heads/master d2e81819d -> 5d8a4b045
Added readLockMinAge parameter to file, camel-ftp components Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa04c078 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa04c078 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa04c078 Branch: refs/heads/master Commit: fa04c0780234ed9ba857dd36d0a5b379170b7ae2 Parents: d2e8181 Author: Jyrki Ruuskanen <yur...@kotikone.fi> Authored: Fri Feb 27 22:30:43 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 4 07:18:27 2015 +0100 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 23 +++++- .../FileChangedExclusiveReadLockStrategy.java | 15 +++- .../strategy/FileProcessStrategyFactory.java | 4 + ...leChangedReadLockMinAgeShortCircuitTest.java | 74 +++++++++++++++++ .../strategy/FileChangedReadLockMinAgeTest.java | 87 ++++++++++++++++++++ .../FtpChangedExclusiveReadLockStrategy.java | 16 +++- .../strategy/FtpProcessStrategyFactory.java | 4 + .../SftpChangedExclusiveReadLockStrategy.java | 15 +++- .../strategy/SftpProcessStrategyFactory.java | 4 + ...tpChangedReadLockMinAgeShortCircuitTest.java | 80 ++++++++++++++++++ .../remote/FtpChangedReadLockMinAgeTest.java | 87 ++++++++++++++++++++ 11 files changed, 403 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 3eb1b53..5108fee 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -165,6 +165,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple protected LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; @UriParam(label = "consumer", defaultValue = "1") protected long readLockMinLength = 1; + @UriParam(label = "consumer", defaultValue = "0") + protected long readLockMinAge = 0; @UriParam(label = "consumer") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @@ -1080,7 +1082,15 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple public void setAllowNullBody(boolean allowNullBody) { this.allowNullBody = allowNullBody; } - + + public long getReadLockMinAge() { + return readLockMinAge; + } + + public void setReadLockMinAge(long readLockMinAge) { + this.readLockMinAge = readLockMinAge; + } + /** * Configures the given message with the file which sets the body to the * file object. @@ -1184,6 +1194,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockMarkerFile", readLockMarkerFile); params.put("readLockMinLength", readLockMinLength); params.put("readLockLoggingLevel", readLockLoggingLevel); + params.put("readLockMinAge", readLockMinAge); return params; } @@ -1282,11 +1293,17 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple protected void doStart() throws Exception { // validate that the read lock options is valid for the process strategy if (!"none".equals(readLock) && !"off".equals(readLock)) { + if (readLockTimeout > 0 && readLockMinAge > 0 && readLockTimeout <= readLockCheckInterval + readLockMinAge) { + throw new IllegalArgumentException("The option readLockTimeout must be higher than readLockCheckInterval + readLockMinAge" + + ", was readLockTimeout=" + readLockTimeout + ", readLockCheckInterval+readLockMinAge=" + (readLockCheckInterval + readLockMinAge) + + ". A good practice is to let the readLockTimeout be at least readLockMinAge + 2 times the readLockCheckInterval" + + " to ensure that the read lock procedure has enough time to acquire the lock."); + } if (readLockTimeout > 0 && readLockTimeout <= readLockCheckInterval) { throw new IllegalArgumentException("The option readLockTimeout must be higher than readLockCheckInterval" + ", was readLockTimeout=" + readLockTimeout + ", readLockCheckInterval=" + readLockCheckInterval - + ". A good practice is to let the readLockTimeout be at least 3 or more times higher than the readLockCheckInterval" - + ", to ensure the read lock procedure has amble times to run several times checks during acquiring the lock."); + + ". A good practice is to let the readLockTimeout be at least 3 times higher than the readLockCheckInterval" + + " to ensure that the read lock procedure has enough time to acquire the lock."); } } http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java index f471512..bfaf3da 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.component.file.strategy; import java.io.File; +import java.util.Date; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; @@ -38,6 +39,7 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea private long timeout; private long checkInterval = 1000; private long minLength = 1; + private long minAge = 0; private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; @Override @@ -55,6 +57,7 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea long lastModified = Long.MIN_VALUE; long length = Long.MIN_VALUE; StopWatch watch = new StopWatch(); + long startTime = (new Date()).getTime(); while (!exclusive) { // timeout check @@ -70,11 +73,13 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea long newLastModified = target.lastModified(); long newLength = target.length(); + long newOlderThan = startTime + watch.taken() - minAge; LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified); LOG.trace("Previous length: {}, new length: {}", length, newLength); + LOG.trace("New older than threshold: {}", newOlderThan); - if (length >= minLength && (newLastModified == lastModified && newLength == length)) { + if (newLength >= minLength && ((minAge == 0 && newLastModified == lastModified && newLength == length) || (minAge != 0 && newLastModified < newOlderThan))) { LOG.trace("Read lock acquired."); exclusive = true; } else { @@ -134,4 +139,12 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea public void setMinLength(long minLength) { this.minLength = minLength; } + + public long getMinAge() { + return minAge; + } + + public void setMinAge(long minAge) { + this.minAge = minAge; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index 7fd3636..ed5bd4e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -122,6 +122,10 @@ public final class FileProcessStrategyFactory { if (minLength != null) { readLockStrategy.setMinLength(minLength); } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } strategy = readLockStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java new file mode 100644 index 0000000..1f4b388 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeShortCircuitTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.FileOutputStream; +import java.util.Date; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version + */ +public class FileChangedReadLockMinAgeShortCircuitTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FileChangedReadLockMinAgeShortCircuitTest.class); + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/changed/"); + createDirectory("target/changed/in"); + writeFile(); + Thread.sleep(1000); + super.setUp(); + } + + public void testChangedReadLockMinAge() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/changed/out/file.dat"); + // We should get the file on the first poll + mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isLessThan(new Date().getTime()+15000)); + + assertMockEndpointsSatisfied(); + } + + private void writeFile() throws Exception { + LOG.debug("Writing file..."); + + FileOutputStream fos = new FileOutputStream("target/changed/in/file.dat"); + fos.write("Line".getBytes()); + fos.flush(); + fos.close(); + LOG.debug("Writing file DONE..."); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=changed&readLockMinAge=500&readLockCheckInterval=30000&readLockTimeout=90000").to("file:target/changed/out", "mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java new file mode 100644 index 0000000..2dffcc9 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockMinAgeTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Date; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version + */ +public class FileChangedReadLockMinAgeTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FileChangedReadLockMinAgeTest.class); + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/changed/"); + createDirectory("target/changed/in"); + super.setUp(); + } + + public void testChangedReadLockMinAge() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/changed/out/slowfile.dat"); + // writing takes ~2 seconds, and then it has to age for at least minAge milliseconds (3 seconds) + mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isGreaterThan(new Date().getTime()+5000)); + + writeSlowFile(); + + assertMockEndpointsSatisfied(); + + String content = context.getTypeConverter().convertTo(String.class, new File("target/changed/out/slowfile.dat")); + String[] lines = content.split(LS); + assertEquals("There should be 20 lines in the file", 20, lines.length); + for (int i = 0; i < 20; i++) { + assertEquals("Line " + i, lines[i]); + } + } + + private void writeSlowFile() throws Exception { + LOG.debug("Writing slow file..."); + + FileOutputStream fos = new FileOutputStream("target/changed/in/slowfile.dat"); + for (int i = 0; i < 20; i++) { + fos.write(("Line " + i + LS).getBytes()); + LOG.debug("Writing line " + i); + Thread.sleep(100); + } + + fos.flush(); + fos.close(); + LOG.debug("Writing slow file DONE..."); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=changed&readLockMinAge=3000").to("file:target/changed/out", "mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java index fdbf457..fb1b850 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.remote.strategy; +import java.util.Date; import java.util.List; import org.apache.camel.Exchange; @@ -36,6 +37,7 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive private long checkInterval = 5000; private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; private long minLength = 1; + private long minAge = 0; private boolean fastExistsCheck; @Override @@ -51,6 +53,7 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive long lastModified = Long.MIN_VALUE; long length = Long.MIN_VALUE; StopWatch watch = new StopWatch(); + long startTime = (new Date()).getTime(); while (!exclusive) { // timeout check @@ -66,6 +69,7 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive long newLastModified = 0; long newLength = 0; + List<FTPFile> files; if (fastExistsCheck) { // use the absolute file path to only pickup the file we want to check, this avoids expensive @@ -89,8 +93,10 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified); LOG.trace("Previous length: " + length + ", new length: " + newLength); + long newOlderThan = startTime + watch.taken() - minAge; + LOG.trace("New older than threshold: {}", newOlderThan); - if (length >= minLength && (newLastModified == lastModified && newLength == length)) { + if (newLength >= minLength && ((minAge == 0 && newLastModified == lastModified && newLength == length) || (minAge != 0 && newLastModified < newOlderThan))) { LOG.trace("Read lock acquired."); exclusive = true; } else { @@ -161,6 +167,14 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive this.minLength = minLength; } + public long getMinAge() { + return minAge; + } + + public void setMinAge(long minAge) { + this.minAge = minAge; + } + public boolean isFastExistsCheck() { return fastExistsCheck; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java index ac0977e..55ba556 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java @@ -130,6 +130,10 @@ public final class FtpProcessStrategyFactory { if (minLength != null) { readLockStrategy.setMinLength(minLength); } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } Boolean fastExistsCheck = (Boolean) params.get("fastExistsCheck"); if (fastExistsCheck != null) { readLockStrategy.setFastExistsCheck(fastExistsCheck); http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java index 715b3e6..8100811 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.remote.strategy; +import java.util.Date; import java.util.List; import com.jcraft.jsch.ChannelSftp; @@ -36,6 +37,7 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv private long checkInterval = 5000; private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; private long minLength = 1; + private long minAge = 0; private boolean fastExistsCheck; @Override @@ -51,6 +53,7 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv long lastModified = Long.MIN_VALUE; long length = Long.MIN_VALUE; StopWatch watch = new StopWatch(); + long startTime = (new Date()).getTime(); while (!exclusive) { // timeout check @@ -87,8 +90,10 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified); LOG.trace("Previous length: " + length + ", new length: " + newLength); + long newOlderThan = startTime + watch.taken() - minAge; + LOG.trace("New older than threshold: {}", newOlderThan); - if (length >= minLength && (newLastModified == lastModified && newLength == length)) { + if (newLength >= minLength && ((minAge == 0 && newLastModified == lastModified && newLength == length) || (minAge != 0 && newLastModified < newOlderThan))) { LOG.trace("Read lock acquired."); exclusive = true; } else { @@ -154,6 +159,14 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv this.minLength = minLength; } + public long getMinAge() { + return minAge; + } + + public void setMinAge(long minAge) { + this.minAge = minAge; + } + public boolean isFastExistsCheck() { return fastExistsCheck; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java index d199ac2..aabccb4 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java @@ -126,6 +126,10 @@ public final class SftpProcessStrategyFactory { if (minLength != null) { readLockStrategy.setMinLength(minLength); } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } Boolean fastExistsCheck = (Boolean) params.get("fastExistsCheck"); if (fastExistsCheck != null) { readLockStrategy.setFastExistsCheck(fastExistsCheck); http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java new file mode 100644 index 0000000..4ce6492 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeShortCircuitTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Date; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class FtpChangedReadLockMinAgeShortCircuitTest extends FtpServerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FtpChangedReadLockMinAgeShortCircuitTest.class); + + protected String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/changed?password=admin&readLock=changed&readLockMinAge=500&readLockCheckInterval=30000&readLockTimeout=90000&delete=true"; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + writeFile(); + Thread.sleep(1000); + } + + @Test + public void testChangedReadLock() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/changed/out/slowfile.dat"); + // We should get the file on the first poll + mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isLessThan(new Date().getTime() + 15000)); + + assertMockEndpointsSatisfied(); + } + + private void writeFile() throws Exception { + LOG.debug("Writing file..."); + + createDirectory(FTP_ROOT_DIR + "/changed"); + FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + "/changed/slowfile.dat", true); + fos.write("Line".getBytes()); + fos.flush(); + fos.close(); + LOG.debug("Writing file DONE..."); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(getFtpUrl()).to("file:target/changed/out", "mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/fa04c078/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java new file mode 100644 index 0000000..28b1f95 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockMinAgeTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Date; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class FtpChangedReadLockMinAgeTest extends FtpServerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FtpChangedReadLockMinAgeTest.class); + + protected String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/changed?password=admin&readLock=changed&readLockCheckInterval=1000&readLockMinAge=3000&delete=true"; + } + + @Test + public void testChangedReadLock() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/changed/out/slowfile.dat"); + // writing takes ~2 seconds, and then it has to age for at least minAge milliseconds (3 seconds) + mock.expectedMessagesMatches(property(Exchange.RECEIVED_TIMESTAMP).isGreaterThan(new Date().getTime() + 5000)); + + writeSlowFile(); + + assertMockEndpointsSatisfied(); + + String content = context.getTypeConverter().convertTo(String.class, new File("target/changed/out/slowfile.dat")); + String[] lines = content.split(LS); + assertEquals("There should be 20 lines in the file", 20, lines.length); + for (int i = 0; i < 20; i++) { + assertEquals("Line " + i, lines[i]); + } + } + + private void writeSlowFile() throws Exception { + LOG.debug("Writing slow file..."); + + createDirectory(FTP_ROOT_DIR + "/changed"); + FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + "/changed/slowfile.dat", true); + for (int i = 0; i < 20; i++) { + fos.write(("Line " + i + LS).getBytes()); + LOG.debug("Writing line " + i); + Thread.sleep(100); + } + + fos.flush(); + fos.close(); + LOG.debug("Writing slow file DONE..."); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(getFtpUrl()).to("file:target/changed/out", "mock:result"); + } + }; + } + +}