Repository: camel Updated Branches: refs/heads/master e669002a9 -> e100e0fb6
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e100e0fb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e100e0fb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e100e0fb Branch: refs/heads/master Commit: e100e0fb63dd1d8fd9b2ddb398035f18796b0afc Parents: e669002 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 3 15:16:56 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 3 15:16:56 2015 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 18 ------ ...ileIdempotentRepositoryReadLockStrategy.java | 32 ++-------- .../strategy/FileProcessStrategyFactory.java | 4 -- ...empotentReadLockRemoveOnCommitFalseTest.java | 61 -------------------- .../strategy/FileIdempotentReadLockTest.java | 8 ++- 5 files changed, 11 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e100e0fb/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index d01bebb..9cd7b4f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -171,8 +171,6 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple @UriParam(label = "consumer", defaultValue = "0") protected long readLockMinAge; @UriParam(label = "consumer", defaultValue = "true") - protected boolean readLockRemoveOnCommit = true; - @UriParam(label = "consumer", defaultValue = "true") protected boolean readLockRemoveOnRollback = true; @UriParam(label = "consumer") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @@ -931,21 +929,6 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple this.readLockMinAge = readLockMinAge; } - public boolean isReadLockRemoveOnCommit() { - return readLockRemoveOnCommit; - } - - /** - * This option applied only for readLock=idempotent. - * This option allows to specify whether to remove the file name entry from the idempotent repository - * when the file was processed successfully and is committed. Setting this to <tt>false</tt> allows - * to use the read lock as both read lock and idempotent consumer at the same time, as previously - * processed file will be kept in the idempotent repository so the same file is not processed again. - */ - public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) { - this.readLockRemoveOnCommit = readLockRemoveOnCommit; - } - public boolean isReadLockRemoveOnRollback() { return readLockRemoveOnRollback; } @@ -1272,7 +1255,6 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockMinLength", readLockMinLength); params.put("readLockLoggingLevel", readLockLoggingLevel); params.put("readLockMinAge", readLockMinAge); - params.put("readLockRemoveOnCommit", readLockRemoveOnCommit); params.put("readLockRemoveOnRollback", readLockRemoveOnRollback); return params; } http://git-wip-us.apache.org/repos/asf/camel/blob/e100e0fb/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index d2b1b6f..763b7e0 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -47,7 +47,6 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp private CamelContext camelContext; private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; - private boolean removeOnCommit = true; @Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { @@ -80,21 +79,20 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp @Override public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); if (removeOnRollback) { - String key = asKey(file); idempotentRepository.remove(key); + } else { + // okay we should not remove then confirm it instead + idempotentRepository.confirm(key); } } @Override public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - if (removeOnCommit) { - idempotentRepository.remove(key); - } else { - // if not remove then confirm - idempotentRepository.confirm(key); - } + // confirm on commit + idempotentRepository.confirm(key); } public void setTimeout(long timeout) { @@ -153,24 +151,6 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp this.removeOnRollback = removeOnRollback; } - /** - * Whether to remove the file from the idempotent repository when doing a commit. - * <p/> - * By default this is true. - */ - public boolean isRemoveOnCommit() { - return removeOnCommit; - } - - /** - * Whether to remove the file from the idempotent repository when doing a commit. - * <p/> - * By default this is true. - */ - public void setRemoveOnCommit(boolean removeOnCommit) { - this.removeOnCommit = removeOnCommit; - } - protected String asKey(GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression key was configured String key = file.getAbsoluteFilePath(); http://git-wip-us.apache.org/repos/asf/camel/blob/e100e0fb/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index 6987905..5a31374 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -130,10 +130,6 @@ public final class FileProcessStrategyFactory { strategy = readLockStrategy; } else if ("idempotent".equals(readLock)) { FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); - Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); - if (readLockRemoveOnCommit != null) { - readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); - } Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); if (readLockRemoveOnRollback != null) { readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); http://git-wip-us.apache.org/repos/asf/camel/blob/e100e0fb/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockRemoveOnCommitFalseTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockRemoveOnCommitFalseTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockRemoveOnCommitFalseTest.java deleted file mode 100644 index 854f106..0000000 --- a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockRemoveOnCommitFalseTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.file.strategy; - -import java.util.concurrent.TimeUnit; - -import org.apache.camel.Exchange; -import org.apache.camel.builder.NotifyBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; - -/** - * @version - */ -public class FileIdempotentReadLockRemoveOnCommitFalseTest extends FileIdempotentReadLockTest { - - @Override - public void testIdempotentReadLock() throws Exception { - assertEquals(0, myRepo.getCacheSize()); - - NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create(); - - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(2); - - template.sendBodyAndHeader("file:target/changed/in", "Hello World", Exchange.FILE_NAME, "hello.txt"); - template.sendBodyAndHeader("file:target/changed/in", "Bye World", Exchange.FILE_NAME, "bye.txt"); - - assertMockEndpointsSatisfied(); - - assertTrue(notify.matches(5, TimeUnit.SECONDS)); - - // they are not removed with commit - assertEquals(2, myRepo.getCacheSize()); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("file:target/changed/in?readLock=idempotent&idempotentRepository=#myRepo&readLockRemoveOnCommit=false") - .to("mock:result"); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/e100e0fb/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockTest.java index 0ce9a6f..c73eb89 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockTest.java @@ -63,8 +63,9 @@ public class FileIdempotentReadLockTest extends ContextTestSupport { assertTrue(notify.matches(5, TimeUnit.SECONDS)); - // they are removed with commit - assertEquals(0, myRepo.getCacheSize()); + // the files are kept on commit + // if you want to remove them then the idempotent repo need some way to evict idle keys + assertEquals(2, myRepo.getCacheSize()); } @Override @@ -77,7 +78,8 @@ public class FileIdempotentReadLockTest extends ContextTestSupport { @Override public void process(Exchange exchange) throws Exception { // we are in progress - assertEquals(1, myRepo.getCacheSize()); + int size = myRepo.getCacheSize(); + assertTrue(size == 1 || size == 2); } }) .to("mock:result");