Author: davsclaus Date: Thu Jul 23 15:34:48 2009 New Revision: 797105 URL: http://svn.apache.org/viewvc?rev=797105&view=rev Log: CAMEL-1844: Added new readLock=changed option to file consumer to detect files currently being in progress using a file modified/length change detection.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java (contents, props changed) - copied, changed from r797018, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java (from r797018, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java&r1=797018&r2=797105&rev=797105&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java Thu Jul 23 15:34:48 2009 @@ -18,72 +18,64 @@ import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.Channel; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; import org.apache.camel.Exchange; import org.apache.camel.component.file.GenericFile; -import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; import org.apache.camel.component.file.GenericFileOperations; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * Acquires exclusive read lock to the given file. Will wait until the lock is granted. - * After granting the read lock it is released, we just want to make sure that when we start - * consuming the file its not currently in progress of being written by third party. + * Acquires exclusive read lock to the given file by checking whether the file is being + * changed by scanning the files at different intervals. */ -public class FileLockExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> { - private static final transient Log LOG = LogFactory.getLog(FileLockExclusiveReadLockStrategy.class); +public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { + private static final transient Log LOG = LogFactory.getLog(FileChangedExclusiveReadLockStrategy.class); private long timeout; public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { File target = new File(file.getAbsoluteFilePath()); + boolean exclusive = false; if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for exclusive read lock to file: " + target); + LOG.trace("Waiting for exclusive read lock to file: " + file); } try { - // try to acquire rw lock on the file before we can consume it - FileChannel channel = new RandomAccessFile(target, "rw").getChannel(); + long lastModified = Long.MIN_VALUE; + long length = Long.MIN_VALUE; long start = System.currentTimeMillis(); - boolean exclusive = false; while (!exclusive) { // timeout check if (timeout > 0) { long delta = System.currentTimeMillis() - start; if (delta > timeout) { - LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); + LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); // we could not get the lock within the timeout period, so return false return false; } } - // get the lock using either try lock or not depending on if we are using timeout or not - FileLock lock = null; - try { - lock = timeout > 0 ? channel.tryLock() : channel.lock(); - } catch (IllegalStateException ex) { - // Also catch the OverlappingFileLockException here. Do nothing here - } - if (lock != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + target); - } + long newLastModified = target.lastModified(); + long newLength = target.length(); - // store lock so we can release it later - exchange.setProperty("CamelFileLock", lock); - exchange.setProperty("CamelFileLockName", target.getName()); + if (LOG.isTraceEnabled()) { + LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified); + LOG.trace("Previous length: " + length + ", new length: " + newLength); + } - exclusive = true; + if (newLastModified == lastModified && newLength == length) { + // let super handle the last part of acquiring the lock now the file is not + // currently being in progress of being copied as file length and modified + // are stable + exclusive = super.acquireExclusiveReadLock(operations, file, exchange); } else { + // set new base file change information + lastModified = newLastModified; + length = newLength; + boolean interrupted = sleep(); if (interrupted) { // we were interrupted while sleeping, we are likely being shutdown so return false @@ -108,24 +100,13 @@ } } - return true; - } - - public void releaseExclusiveReadLock(GenericFileOperations<File> operations, - GenericFile<File> file, Exchange exchange) throws Exception { - FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class); - String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class); - Channel channel = lock.channel(); - try { - lock.release(); - } finally { - // must close channel first - ObjectHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG); - } + return exclusive; } private boolean sleep() { - LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis."); + if (LOG.isTraceEnabled()) { + LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis."); + } try { Thread.sleep(1000); return false; @@ -151,4 +132,4 @@ this.timeout = timeout; } -} +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=797105&r1=797104&r2=797105&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Thu Jul 23 15:34:48 2009 @@ -99,6 +99,13 @@ readLockStrategy.setTimeout(timeout); } return readLockStrategy; + } else if ("changed".equals(readLock)) { + GenericFileExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy(); + Long timeout = (Long) params.get("readLockTimeout"); + if (timeout != null) { + readLockStrategy.setTimeout(timeout); + } + return readLockStrategy; } else if ("markerFile".equals(readLock)) { return new MarkerFileExclusiveReadLockStrategy(); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java?rev=797105&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java Thu Jul 23 15:34:48 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * A manual test by copying a big file into the target/big/in folder and check + * that it is copied completly, eg its file size is the same as the original file + * + * @version $Revision$ + */ +public class FileBigFileCopyManually extends ContextTestSupport { + + public void testCopyBigFile() throws Exception { + deleteDirectory("target/big/"); + createDirectory("target/big/in"); + + MockEndpoint mock = getMockEndpoint("mock:out"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/big/out/bigfile.dat"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file://target/big/in?noop=true&readLock=changed").to("file://target/big/out", "mock:out"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java?rev=797105&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java Thu Jul 23 15:34:48 2009 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.File; +import java.io.FileOutputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision$ + */ +public class FileChangedReadLockTest extends ContextTestSupport { + + private static final transient Log LOG = LogFactory.getLog(FileChangedReadLockTest.class); + + public void testChangedReadLock() throws Exception { + deleteDirectory("target/changed/"); + createDirectory("target/changed/in"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedFileExists("target/changed/out/slowfile.dat"); + + writeSlowFile(); + + assertMockEndpointsSatisfied(); + + String content = context.getTypeConverter().convertTo(String.class, new File("target/changed/out/slowfile.dat").getAbsoluteFile()); + String[] lines = content.split("\n"); + for (int i = 0; i < 20; i++) { + assertEquals("Line " + i, lines[i]); + } + } + + private void writeSlowFile() throws Exception { + LOG.debug("Writing slow file..."); + + FileOutputStream fos = new FileOutputStream("target/changed/in/slowfile.dat"); + for (int i = 0; i < 20; i++) { + fos.write(("Line " + i + "\n").getBytes()); + LOG.debug("Writing line " + i); + Thread.sleep(200); + } + + fos.flush(); + fos.close(); + LOG.debug("Writing slow file DONE..."); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=changed").to("file:target/changed/out", "mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java?rev=797105&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java Thu Jul 23 15:34:48 2009 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class FileChangedReadLockTimeoutTest extends FileChangedReadLockTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=changed&readLockTimeout=2000").to("file:target/changed/out", "mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date