Repository: camel Updated Branches: refs/heads/master 86f4cec71 -> 5643f5557
CAMEL-10448: File read lock - idempotent and change/rename should be possible Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5643f555 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5643f555 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5643f555 Branch: refs/heads/master Commit: 5643f5557a9af9e073ee1ab982d488ecc88fa076 Parents: 86f4cec Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Nov 7 10:28:14 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Nov 7 10:28:14 2016 +0100 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 8 +- ...potentChangedRepositoryReadLockStrategy.java | 237 +++++++++++++++++++ ...mpotentRenameRepositoryReadLockStrategy.java | 229 ++++++++++++++++++ .../strategy/FileProcessStrategyFactory.java | 38 +++ .../FileIdempotentChangedReadLockTest.java | 46 ++++ .../FileIdempotentRenameReadLockTest.java | 46 ++++ 6 files changed, 602 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index e251154..d9449cc 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -154,7 +154,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple protected Comparator<Exchange> sortBy; @UriParam(label = "consumer,sort") protected boolean shuffle; - @UriParam(label = "consumer,lock", enums = "none,markerFile,fileLock,rename,changed,idempotent") + @UriParam(label = "consumer,lock", enums = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename") protected String readLock = "none"; @UriParam(label = "consumer,lock", defaultValue = "1000") protected long readLockCheckInterval = 1000; @@ -796,6 +796,10 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple * <li>rename - rename is for using a try to rename the file as a test if we can get exclusive read-lock.</li> * <li>idempotent - (only for file component) idempotent is for using a idempotentRepository as the read-lock. * This allows to use read locks that supports clustering if the idempotent repository implementation supports that.</li> + * <li>idempotent-changed - (only for file component) idempotent-changed is for using a idempotentRepository and changed as the combined read-lock. + * This allows to use read locks that supports clustering if the idempotent repository implementation supports that.</li> + * <li>idempotent-rename - (only for file component) idempotent-rename is for using a idempotentRepository and rename as the combined read-lock. + * This allows to use read locks that supports clustering if the idempotent repository implementation supports that.</li> * </ul> * Notice: The various read locks is not all suited to work in clustered mode, where concurrent consumers on different nodes is competing * for the same files on a shared file system. The markerFile using a close to atomic operation to create the empty marker file, @@ -1247,7 +1251,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple if (readLock != null) { params.put("readLock", readLock); } - if ("idempotent".equals(readLock)) { + if ("idempotent".equals(readLock) || "idempotent-changed".equals(readLock) || "idempotent-rename".equals(readLock)) { params.put("readLockIdempotentRepository", idempotentRepository); } if (readLockCheckInterval > 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java new file mode 100644 index 0000000..ab8e31f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.File; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileEndpoint; +import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.CamelLogger; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file read lock that uses an {@link IdempotentRepository} and {@link FileChangedExclusiveReadLockStrategy changed} as the lock strategy. + * This allows to plugin and use existing idempotent repositories that for example supports clustering. + * The other read lock strategies that are using marker files or file locks, are not guaranteed to work in clustered setup with various platform and file systems. + */ +public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware { + + private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentChangedRepositoryReadLockStrategy.class); + + private final FileChangedExclusiveReadLockStrategy changed; + private GenericFileEndpoint<File> endpoint; + private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; + private CamelContext camelContext; + private IdempotentRepository<String> idempotentRepository; + private boolean removeOnRollback = true; + private boolean removeOnCommit; + + public FileIdempotentChangedRepositoryReadLockStrategy() { + this.changed = new FileChangedExclusiveReadLockStrategy(); + // no need to use marker file as idempotent ensures exclusive read-lock + this.changed.setMarkerFiler(false); + this.changed.setDeleteOrphanLockFiles(false); + } + + @Override + public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { + this.endpoint = endpoint; + LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); + + changed.prepareOnStartup(operations, endpoint); + } + + @Override + public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // in clustered mode then another node may have processed the file so we must check here again if the file exists + File path = file.getFile(); + if (!path.exists()) { + return false; + } + + // check if we can begin on this file + String key = asKey(file); + boolean answer = idempotentRepository.add(key); + if (!answer) { + // another node is processing the file so skip + CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); + } + + if (answer) { + // if we acquired during idempotent then check changed also + answer = changed.acquireExclusiveReadLock(operations, file, exchange); + if (!answer) { + // remove from idempontent as we did not acquire it from changed + idempotentRepository.remove(key); + } + } + return answer; + } + + @Override + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + changed.releaseExclusiveReadLockOnAbort(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + if (removeOnRollback) { + idempotentRepository.remove(key); + } else { + // okay we should not remove then confirm it instead + idempotentRepository.confirm(key); + } + + changed.releaseExclusiveReadLockOnRollback(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // confirm on commit + idempotentRepository.confirm(key); + } + + changed.releaseExclusiveReadLockOnCommit(operations, file, exchange); + } + + public void setTimeout(long timeout) { + changed.setTimeout(timeout); + } + + public void setCheckInterval(long checkInterval) { + changed.setCheckInterval(checkInterval); + } + + public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { + this.readLockLoggingLevel = readLockLoggingLevel; + changed.setReadLockLoggingLevel(readLockLoggingLevel); + } + + public void setMarkerFiler(boolean markerFile) { + // we do not use marker files + } + + public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) { + // we do not use marker files + } + + public void setMinLength(long minLength) { + changed.setMinLength(minLength); + } + + public void setMinAge(long minAge) { + changed.setMinAge(minAge); + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public IdempotentRepository<String> getIdempotentRepository() { + return idempotentRepository; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { + this.idempotentRepository = idempotentRepository; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public boolean isRemoveOnRollback() { + return removeOnRollback; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public void setRemoveOnRollback(boolean removeOnRollback) { + this.removeOnRollback = removeOnRollback; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public boolean isRemoveOnCommit() { + return removeOnCommit; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public void setRemoveOnCommit(boolean removeOnCommit) { + this.removeOnCommit = removeOnCommit; + } + + protected String asKey(GenericFile<File> file) { + // use absolute file path as default key, but evaluate if an expression key was configured + String key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + return key; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext", this); + ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); + + // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself + camelContext.addService(idempotentRepository, true); + } + + @Override + protected void doStop() throws Exception { + // noop + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java new file mode 100644 index 0000000..a99ee23 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.File; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileEndpoint; +import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.CamelLogger; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file read lock that uses an {@link IdempotentRepository} and {@link FileRenameExclusiveReadLockStrategy rename} as the lock strategy. + * This allows to plugin and use existing idempotent repositories that for example supports clustering. + * The other read lock strategies that are using marker files or file locks, are not guaranteed to work in clustered setup with various platform and file systems. + */ +public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware { + + private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRenameRepositoryReadLockStrategy.class); + + private final FileRenameExclusiveReadLockStrategy rename; + private GenericFileEndpoint<File> endpoint; + private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; + private CamelContext camelContext; + private IdempotentRepository<String> idempotentRepository; + private boolean removeOnRollback = true; + private boolean removeOnCommit; + + public FileIdempotentRenameRepositoryReadLockStrategy() { + this.rename = new FileRenameExclusiveReadLockStrategy(); + // no need to use marker file as idempotent ensures exclusive read-lock + this.rename.setMarkerFiler(false); + this.rename.setDeleteOrphanLockFiles(false); + } + + @Override + public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { + this.endpoint = endpoint; + LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); + + rename.prepareOnStartup(operations, endpoint); + } + + @Override + public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // in clustered mode then another node may have processed the file so we must check here again if the file exists + File path = file.getFile(); + if (!path.exists()) { + return false; + } + + // check if we can begin on this file + String key = asKey(file); + boolean answer = idempotentRepository.add(key); + if (!answer) { + // another node is processing the file so skip + CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); + } + + if (answer) { + // if we acquired during idempotent then check rename also + answer = rename.acquireExclusiveReadLock(operations, file, exchange); + if (!answer) { + // remove from idempontent as we did not acquire it from changed + idempotentRepository.remove(key); + } + } + return answer; + } + + @Override + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + rename.releaseExclusiveReadLockOnAbort(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + if (removeOnRollback) { + idempotentRepository.remove(key); + } else { + // okay we should not remove then confirm it instead + idempotentRepository.confirm(key); + } + + rename.releaseExclusiveReadLockOnRollback(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // confirm on commit + idempotentRepository.confirm(key); + } + + rename.releaseExclusiveReadLockOnCommit(operations, file, exchange); + } + + public void setTimeout(long timeout) { + rename.setTimeout(timeout); + } + + public void setCheckInterval(long checkInterval) { + rename.setCheckInterval(checkInterval); + } + + public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { + this.readLockLoggingLevel = readLockLoggingLevel; + rename.setReadLockLoggingLevel(readLockLoggingLevel); + } + + public void setMarkerFiler(boolean markerFile) { + // we do not use marker files + } + + public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) { + // we do not use marker files + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public IdempotentRepository<String> getIdempotentRepository() { + return idempotentRepository; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { + this.idempotentRepository = idempotentRepository; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public boolean isRemoveOnRollback() { + return removeOnRollback; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public void setRemoveOnRollback(boolean removeOnRollback) { + this.removeOnRollback = removeOnRollback; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public boolean isRemoveOnCommit() { + return removeOnCommit; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public void setRemoveOnCommit(boolean removeOnCommit) { + this.removeOnCommit = removeOnCommit; + } + + protected String asKey(GenericFile<File> file) { + // use absolute file path as default key, but evaluate if an expression key was configured + String key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + return key; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext", this); + ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); + + // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself + camelContext.addService(idempotentRepository, true); + } + + @Override + protected void doStop() throws Exception { + // noop + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index f5348b0..3d74ea2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -143,6 +143,44 @@ public final class FileProcessStrategyFactory { readLockStrategy.setIdempotentRepository(repo); } strategy = readLockStrategy; + } else if ("idempotent-changed".equals(readLock)) { + FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy = new FileIdempotentChangedRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + Long minLength = (Long) params.get("readLockMinLength"); + if (minLength != null) { + readLockStrategy.setMinLength(minLength); + } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } + strategy = readLockStrategy; + } else if ("idempotent-rename".equals(readLock)) { + FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRenameRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + strategy = readLockStrategy; } if (strategy != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java new file mode 100644 index 0000000..2f60395 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class FileIdempotentChangedReadLockTest extends FileIdempotentReadLockTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=idempotent-changed&idempotentRepository=#myRepo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // we are in progress + int size = myRepo.getCacheSize(); + assertTrue(size == 1 || size == 2); + } + }) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java new file mode 100644 index 0000000..3aa89f0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class FileIdempotentRenameReadLockTest extends FileIdempotentReadLockTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/changed/in?readLock=idempotent-rename&idempotentRepository=#myRepo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // we are in progress + int size = myRepo.getCacheSize(); + assertTrue(size == 1 || size == 2); + } + }) + .to("mock:result"); + } + }; + } +}