Author: davsclaus
Date: Thu Jul 23 15:34:48 2009
New Revision: 797105

URL: http://svn.apache.org/viewvc?rev=797105&view=rev
Log:
CAMEL-1844: Added new readLock=changed option to file consumer to detect files 
currently being in progress using a file modified/length change detection.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
   (contents, props changed)
      - copied, changed from r797018, 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java

Copied: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
 (from r797018, 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java&r1=797018&r2=797105&rev=797105&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
 Thu Jul 23 15:34:48 2009
@@ -18,72 +18,64 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.Channel;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.file.GenericFile;
-import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
 import org.apache.camel.component.file.GenericFileOperations;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Acquires exclusive read lock to the given file. Will wait until the lock is 
granted.
- * After granting the read lock it is released, we just want to make sure that 
when we start
- * consuming the file its not currently in progress of being written by third 
party.
+ * Acquires exclusive read lock to the given file by checking whether the file 
is being
+ * changed by scanning the files at different intervals.
  */
-public class FileLockExclusiveReadLockStrategy implements 
GenericFileExclusiveReadLockStrategy<File> {
-    private static final transient Log LOG = 
LogFactory.getLog(FileLockExclusiveReadLockStrategy.class);
+public class FileChangedExclusiveReadLockStrategy extends 
MarkerFileExclusiveReadLockStrategy {
+    private static final transient Log LOG = 
LogFactory.getLog(FileChangedExclusiveReadLockStrategy.class);
     private long timeout;
 
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> 
operations, GenericFile<File> file, Exchange exchange) throws Exception {
         File target = new File(file.getAbsoluteFilePath());
+        boolean exclusive = false;
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for exclusive read lock to file: " + target);
+            LOG.trace("Waiting for exclusive read lock to file: " + file);
         }
 
         try {
-            // try to acquire rw lock on the file before we can consume it
-            FileChannel channel = new RandomAccessFile(target, 
"rw").getChannel();
+            long lastModified = Long.MIN_VALUE;
+            long length = Long.MIN_VALUE;
 
             long start = System.currentTimeMillis();
-            boolean exclusive = false;
 
             while (!exclusive) {
                 // timeout check
                 if (timeout > 0) {
                     long delta = System.currentTimeMillis() - start;
                     if (delta > timeout) {
-                        LOG.debug("Cannot acquire read lock within " + timeout 
+ " millis. Will skip the file: " + target);
+                        LOG.debug("Cannot acquire read lock within " + timeout 
+ " millis. Will skip the file: " + file);
                         // we could not get the lock within the timeout 
period, so return false
                         return false;
                     }
                 }
 
-                // get the lock using either try lock or not depending on if 
we are using timeout or not
-                FileLock lock = null;
-                try {
-                    lock = timeout > 0 ? channel.tryLock() : channel.lock();
-                } catch (IllegalStateException ex) {
-                    // Also catch the OverlappingFileLockException here. Do 
nothing here                    
-                }
-                if (lock != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Acquired exclusive read lock: " + lock + " 
to file: " + target);
-                    }
+                long newLastModified = target.lastModified();
+                long newLength = target.length();
 
-                    // store lock so we can release it later
-                    exchange.setProperty("CamelFileLock", lock);
-                    exchange.setProperty("CamelFileLockName", 
target.getName());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Previous last modified: " + lastModified + ", 
new last modified: " + newLastModified);
+                    LOG.trace("Previous length: " + length + ", new length: " 
+ newLength);
+                }
 
-                    exclusive = true;
+                if (newLastModified == lastModified && newLength == length) {
+                    // let super handle the last part of acquiring the lock 
now the file is not
+                    // currently being in progress of being copied as file 
length and modified
+                    // are stable
+                    exclusive = super.acquireExclusiveReadLock(operations, 
file, exchange);
                 } else {
+                    // set new base file change information
+                    lastModified = newLastModified;
+                    length = newLength;
+
                     boolean interrupted = sleep();
                     if (interrupted) {
                         // we were interrupted while sleeping, we are likely 
being shutdown so return false
@@ -108,24 +100,13 @@
             }
         }
 
-        return true;
-    }
-
-    public void releaseExclusiveReadLock(GenericFileOperations<File> 
operations,
-                                         GenericFile<File> file, Exchange 
exchange) throws Exception {
-        FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, 
"CamelFileLock", FileLock.class);
-        String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, 
"CamelFileLockName", String.class);
-        Channel channel = lock.channel();
-        try {
-            lock.release();
-        } finally {
-            // must close channel first
-            ObjectHelper.close(channel, "while acquiring exclusive read lock 
for file: " + lockFileName, LOG);
-        }
+        return exclusive;
     }
 
     private boolean sleep() {
-        LOG.trace("Exclusive read lock not granted. Sleeping for 1000 
millis.");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Exclusive read lock not granted. Sleeping for 1000 
millis.");
+        }
         try {
             Thread.sleep(1000);
             return false;
@@ -151,4 +132,4 @@
         this.timeout = timeout;
     }
 
-}
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=797105&r1=797104&r2=797105&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
 Thu Jul 23 15:34:48 2009
@@ -99,6 +99,13 @@
                     readLockStrategy.setTimeout(timeout);
                 }
                 return readLockStrategy;
+            } else if ("changed".equals(readLock)) {
+                GenericFileExclusiveReadLockStrategy readLockStrategy = new 
FileChangedExclusiveReadLockStrategy();
+                Long timeout = (Long) params.get("readLockTimeout");
+                if (timeout != null) {
+                    readLockStrategy.setTimeout(timeout);
+                }
+                return readLockStrategy;
             } else if ("markerFile".equals(readLock)) {
                 return new MarkerFileExclusiveReadLockStrategy();
             }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java?rev=797105&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
 Thu Jul 23 15:34:48 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * A manual test by copying a big file into the target/big/in folder and check
+ * that it is copied completly, eg its file size is the same as the original 
file
+ *
+ * @version $Revision$
+ */
+public class FileBigFileCopyManually extends ContextTestSupport {
+
+    public void testCopyBigFile() throws Exception {
+        deleteDirectory("target/big/");
+        createDirectory("target/big/in");
+
+        MockEndpoint mock = getMockEndpoint("mock:out");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/big/out/bigfile.dat");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file://target/big/in?noop=true&readLock=changed").to("file://target/big/out",
 "mock:out");
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileBigFileCopyManually.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java?rev=797105&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
 Thu Jul 23 15:34:48 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class FileChangedReadLockTest extends ContextTestSupport {
+
+    private static final transient Log LOG = 
LogFactory.getLog(FileChangedReadLockTest.class);
+
+    public void testChangedReadLock() throws Exception {
+        deleteDirectory("target/changed/");
+        createDirectory("target/changed/in");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedFileExists("target/changed/out/slowfile.dat");
+
+        writeSlowFile();
+
+        assertMockEndpointsSatisfied();
+
+        String content = context.getTypeConverter().convertTo(String.class, 
new File("target/changed/out/slowfile.dat").getAbsoluteFile());
+        String[] lines = content.split("\n");
+        for (int i = 0; i < 20; i++) {
+            assertEquals("Line " + i, lines[i]);
+        }
+    }
+
+    private void writeSlowFile() throws Exception {
+        LOG.debug("Writing slow file...");
+
+        FileOutputStream fos = new 
FileOutputStream("target/changed/in/slowfile.dat");
+        for (int i = 0; i < 20; i++) {
+            fos.write(("Line " + i + "\n").getBytes());
+            LOG.debug("Writing line " + i);
+            Thread.sleep(200);
+        }
+
+        fos.flush();
+        fos.close();
+        LOG.debug("Writing slow file DONE...");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/changed/in?readLock=changed").to("file:target/changed/out", 
"mock:result");
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java?rev=797105&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
 Thu Jul 23 15:34:48 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class FileChangedReadLockTimeoutTest extends FileChangedReadLockTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/changed/in?readLock=changed&readLockTimeout=2000").to("file:target/changed/out",
 "mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to