Repository: camel Updated Branches: refs/heads/camel-2.15.x 6cb8770ee -> 9f26e12a6 refs/heads/master 1af2133f6 -> d7b8afc74
CAMEL-8954: File componet readlock state should be per file so its possible to use pollEnrich to poll in a 2nd file and keep those state separated. Thanks to Andy Fedotov for unit test. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7b1253db Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7b1253db Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7b1253db Branch: refs/heads/master Commit: 7b1253db516a39a1fe75440ad1226f8fbd60f2c3 Parents: 1af2133 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Jul 16 19:39:55 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jul 16 20:44:33 2015 +0200 ---------------------------------------------------------------------- .../camel/component/file/GenericFile.java | 10 +++ .../FileLockExclusiveReadLockStrategy.java | 21 ++++-- .../MarkerFileExclusiveReadLockStrategy.java | 20 ++++-- ...FileExclusiveReadLockStrategyUnlockTest.java | 73 ++++++++++++++++++++ 4 files changed, 114 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7b1253db/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java index 343d836..907de21 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; public class GenericFile<T> implements WrappedFile<T> { private static final Logger LOG = LoggerFactory.getLogger(GenericFile.class); + private String copyFromAbsoluteFilePath; private String endpointPath; private String fileName; private String fileNameOnly; @@ -66,6 +67,7 @@ public class GenericFile<T> implements WrappedFile<T> { } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } + result.setCopyFromAbsoluteFilePath(source.getAbsoluteFilePath()); result.setEndpointPath(source.getEndpointPath()); result.setAbsolute(source.isAbsolute()); result.setDirectory(source.isDirectory()); @@ -365,6 +367,14 @@ public class GenericFile<T> implements WrappedFile<T> { this.directory = directory; } + public String getCopyFromAbsoluteFilePath() { + return copyFromAbsoluteFilePath; + } + + public void setCopyFromAbsoluteFilePath(String copyFromAbsoluteFilePath) { + this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath; + } + /** * Fixes the path separator to be according to the protocol */ http://git-wip-us.apache.org/repos/asf/camel/blob/7b1253db/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java index 8fd94f5..de5101f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java @@ -127,10 +127,11 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo } } - // we grabbed the lock - exchange.setProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, lock); - exchange.setProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, randomAccessFile); + // store read-lock state + exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), lock); + exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_RANDOM_ACCESS_FILE), randomAccessFile); + // we grabbed the lock return true; } @@ -140,10 +141,10 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo // must call super super.doReleaseExclusiveReadLock(operations, file, exchange); - String target = file.getFileName(); - FileLock lock = exchange.getProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, FileLock.class); - RandomAccessFile rac = exchange.getProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, RandomAccessFile.class); + FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class); + RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class); + String target = file.getFileName(); if (lock != null) { Channel channel = lock.acquiredBy(); try { @@ -186,4 +187,12 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo this.readLockLoggingLevel = readLockLoggingLevel; } + private static String asReadLockKey(GenericFile file, String key) { + // use the copy from absolute path as that was the original path of the file when the lock was acquired + // for example if the file consumer uses preMove then the file is moved and therefore has another name + // that would no longer match + String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath(); + return path + "-" + key; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/7b1253db/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java index ceabd01..1c92bbd 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java @@ -73,8 +73,10 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive // create a plain file as marker filer for locking (do not use FileLock) boolean acquired = FileUtil.createNewFile(new File(lockFileName)); - exchange.setProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, acquired); - exchange.setProperty(Exchange.FILE_LOCK_FILE_NAME, lockFileName); + + // store read-lock state + exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), acquired); + exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), lockFileName); return acquired; } @@ -101,9 +103,11 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive return; } + boolean acquired = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), false, Boolean.class); + // only release the file if camel get the lock before - if (exchange.getProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, false, Boolean.class)) { - String lockFileName = exchange.getProperty(Exchange.FILE_LOCK_FILE_NAME, getLockFileName(file), String.class); + if (acquired) { + String lockFileName = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), String.class); File lock = new File(lockFileName); if (lock.exists()) { @@ -162,4 +166,12 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive return file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX; } + private static String asReadLockKey(GenericFile file, String key) { + // use the copy from absolute path as that was the original path of the file when the lock was acquired + // for example if the file consumer uses preMove then the file is moved and therefore has another name + // that would no longer match + String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath(); + return path + "-" + key; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/7b1253db/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java new file mode 100644 index 0000000..b12b8b2 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.FileOutputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; + +public class MarkerFileExclusiveReadLockStrategyUnlockTest extends ContextTestSupport { + + @Override + protected void setUp() throws Exception { + setupDirectory(); + super.setUp(); + } + + public void testUnlocking() throws Exception { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create(); + writeFiles(); + boolean done = notify.matches(5, TimeUnit.SECONDS); + + assertTrue("Route should be done processing 1 exchanges", done); + + assertFileNotExists("target/marker-unlock/input-a/file1.dat.camelLock"); + assertFileNotExists("target/marker-unlock/input-b/file2.dat.camelLock"); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/marker-unlock/input-a?fileName=file1.dat&readLock=markerFile") + .pollEnrich("file:target/marker-unlock/input-b?fileName=file2.dat&readLock=markerFile") + .to("mock:result"); + } + }; + } + + private void setupDirectory() { + deleteDirectory("target/marker-unlock/"); + createDirectory("target/marker-unlock/input-a"); + createDirectory("target/marker-unlock/input-b"); + } + + private void writeFiles() throws Exception { + FileOutputStream fos1 = new FileOutputStream("target/marker-unlock/input-a/file1.dat"); + FileOutputStream fos2 = new FileOutputStream("target/marker-unlock/input-b/file2.dat"); + fos1.write("File-1".getBytes()); + fos2.write("File-2".getBytes()); + fos1.flush(); + fos1.close(); + fos2.flush(); + fos2.close(); + } +}