This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit c564b571ecf5ba34ee8ad9e17f64e31d2b8b226e Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Mar 29 10:41:25 2018 +0200 CAMEL-12382: FileConsumer - Allow to delay readLock release tasks on idempotent read-lock --- camel-core/src/main/docs/file-component.adoc | 6 +- .../camel/component/file/GenericFileEndpoint.java | 54 ++++++++++ ...dempotentChangedRepositoryReadLockStrategy.java | 115 ++++++++++++++++++--- .../FileIdempotentRepositoryReadLockStrategy.java | 103 ++++++++++++++++-- .../file/strategy/FileProcessStrategyFactory.java | 33 ++++++ 5 files changed, 288 insertions(+), 23 deletions(-) diff --git a/camel-core/src/main/docs/file-component.adoc b/camel-core/src/main/docs/file-component.adoc index 574ac13..4213496 100644 --- a/camel-core/src/main/docs/file-component.adoc +++ b/camel-core/src/main/docs/file-component.adoc @@ -72,7 +72,7 @@ with the following path and query parameters: |=== -==== Query Parameters (81 parameters): +==== Query Parameters (85 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -136,6 +136,10 @@ with the following path and query parameters: | *readLock* (lock) | Used by consumer, to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to det [...] | *readLockCheckInterval* (lock) | Interval in millis for the read-lock, if supported by the read lock. This interval is used for sleeping between attempts to acquire the read lock. For example when using the changed read lock, you can set a higher interval period to cater for slow writes. The default of 1 sec. may be too fast if the producer is very slow writing the file. Notice: For FTP the default readLockCheckInterval is 5000. The readLockTimeout value must be higher than readLockChe [...] | *readLockDeleteOrphanLock Files* (lock) | Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may have been left on the file system, if Camel was not properly shutdown (such as a JVM crash). If turning this option to false then any orphaned lock file will cause Camel to not attempt to pickup that file, this could also be due another node is concurrently reading files from the same shared directory. | true | boolean +| *readLockIdempotentRelease Async* (lock) | Whether the delayed release task should be synchronous or asynchronous. | false | boolean +| *readLockIdempotentRelease AsyncPoolSize* (lock) | The number of threads in the scheduled thread pool when using asynchronous release tasks. | | int +| *readLockIdempotentRelease Delay* (lock) | Whether to delay the release task for a period of millis. | | int +| *readLockIdempotentRelease ExecutorService* (lock) | To use a custom and shared thread pool for asynchronous release tasks. | | ScheduledExecutor Service | *readLockLoggingLevel* (lock) | Logging level used when a read lock could not be acquired. By default a WARN is logged. You can change this level, for example to OFF to not have any logging. This option is only applicable for readLock of types: changed, fileLock, idempotent, idempotent-changed, idempotent-rename, rename. | DEBUG | LoggingLevel | *readLockMarkerFile* (lock) | Whether to use marker file with the changed, rename, or exclusive read lock types. By default a marker file is used as well to guard against other processes picking up the same files. This behavior can be turned off by setting this option to false. For example if you do not want to write marker files to the file systems by the Camel application. | true | boolean | *readLockMinAge* (lock) | This option applied only for readLock=change. This option allows to specify a minimum age the file must be before attempting to acquire the read lock. For example use readLockMinAge=300s to require the file is at last 5 minutes old. This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. | 0 | long diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index fc5c0a8..9cad86a 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -23,6 +23,8 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -180,6 +182,14 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple @UriParam(label = "consumer,lock") protected boolean readLockRemoveOnCommit; @UriParam(label = "consumer,lock") + protected int readLockIdempotentReleaseDelay; + @UriParam(label = "consumer,lock") + protected boolean readLockIdempotentReleaseAsync; + @UriParam(label = "consumer,lock") + protected int readLockIdempotentReleaseAsyncPoolSize; + @UriParam(label = "consumer,lock") + protected ScheduledExecutorService readLockIdempotentReleaseExecutorService; + @UriParam(label = "consumer,lock") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @UriParam(label = "consumer,advanced") protected ExceptionHandler onCompletionExceptionHandler; @@ -985,6 +995,46 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple this.readLockRemoveOnCommit = readLockRemoveOnCommit; } + /** + * Whether to delay the release task for a period of millis. + */ + public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) { + this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay; + } + + public boolean isReadLockIdempotentReleaseAsync() { + return readLockIdempotentReleaseAsync; + } + + /** + * Whether the delayed release task should be synchronous or asynchronous. + */ + public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) { + this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync; + } + + public int getReadLockIdempotentReleaseAsyncPoolSize() { + return readLockIdempotentReleaseAsyncPoolSize; + } + + /** + * The number of threads in the scheduled thread pool when using asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) { + this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize; + } + + public ScheduledExecutorService getReadLockIdempotentReleaseExecutorService() { + return readLockIdempotentReleaseExecutorService; + } + + /** + * To use a custom and shared thread pool for asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) { + this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; + } + public int getBufferSize() { return bufferSize; } @@ -1301,6 +1351,10 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockMinAge", readLockMinAge); params.put("readLockRemoveOnRollback", readLockRemoveOnRollback); params.put("readLockRemoveOnCommit", readLockRemoveOnCommit); + params.put("readLockIdempotentReleaseDelay", readLockIdempotentReleaseDelay); + params.put("readLockIdempotentReleaseAsync", readLockIdempotentReleaseAsync); + params.put("readLockIdempotentReleaseAsyncPoolSize", readLockIdempotentReleaseAsyncPoolSize); + params.put("readLockIdempotentReleaseExecutorService", readLockIdempotentReleaseExecutorService); return params; } diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java index d0d2004..17f22af 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java @@ -17,6 +17,8 @@ package org.apache.camel.component.file.strategy; import java.io.File; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -49,6 +51,11 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; private boolean removeOnCommit; + private int readLockIdempotentReleaseDelay; + private boolean readLockIdempotentReleaseAsync; + private int readLockIdempotentReleaseAsyncPoolSize; + private ScheduledExecutorService readLockIdempotentReleaseExecutorService; + private boolean shutdownExecutorService; public FileIdempotentChangedRepositoryReadLockStrategy() { this.changed = new FileChangedExclusiveReadLockStrategy(); @@ -100,27 +107,61 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp @Override public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - if (removeOnRollback) { - idempotentRepository.remove(key); + Runnable r = () -> { + if (removeOnRollback) { + idempotentRepository.remove(key); + } else { + // okay we should not remove then confirm it instead + idempotentRepository.confirm(key); + } + + try { + changed.releaseExclusiveReadLockOnRollback(operations, file, exchange); + } catch (Exception e) { + LOG.warn("Error during releasing exclusive readlock on rollback. This exception is ignored.", e); + } + }; + + if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { + LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); + readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); + } else if (readLockIdempotentReleaseDelay > 0) { + LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + Thread.sleep(readLockIdempotentReleaseDelay); + r.run(); } else { - // okay we should not remove then confirm it instead - idempotentRepository.confirm(key); + r.run(); } - - changed.releaseExclusiveReadLockOnRollback(operations, file, exchange); } @Override public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - if (removeOnCommit) { - idempotentRepository.remove(key); + Runnable r = () -> { + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // confirm on commit + idempotentRepository.confirm(key); + } + + try { + changed.releaseExclusiveReadLockOnCommit(operations, file, exchange); + } catch (Exception e) { + LOG.warn("Error during releasing exclusive readlock on rollback. This exception is ignored.", e); + } + }; + + if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { + LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); + readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); + } else if (readLockIdempotentReleaseDelay > 0) { + LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + Thread.sleep(readLockIdempotentReleaseDelay); + r.run(); } else { - // confirm on commit - idempotentRepository.confirm(key); + r.run(); } - - changed.releaseExclusiveReadLockOnCommit(operations, file, exchange); } public void setTimeout(long timeout) { @@ -210,6 +251,46 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp this.removeOnCommit = removeOnCommit; } + /** + * Whether to delay the release task for a period of millis. + */ + public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) { + this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay; + } + + public boolean isReadLockIdempotentReleaseAsync() { + return readLockIdempotentReleaseAsync; + } + + /** + * Whether the delayed release task should be synchronous or asynchronous. + */ + public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) { + this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync; + } + + public int getReadLockIdempotentReleaseAsyncPoolSize() { + return readLockIdempotentReleaseAsyncPoolSize; + } + + /** + * The number of threads in the scheduled thread pool when using asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) { + this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize; + } + + public ScheduledExecutorService getReadLockIdempotentReleaseExecutorService() { + return readLockIdempotentReleaseExecutorService; + } + + /** + * To use a custom and shared thread pool for asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) { + this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; + } + protected String asKey(GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression key was configured String key = file.getAbsoluteFilePath(); @@ -225,13 +306,21 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp ObjectHelper.notNull(camelContext, "camelContext", this); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); + if (readLockIdempotentReleaseAsync && readLockIdempotentReleaseExecutorService == null) { + readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockChangedIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize); + shutdownExecutorService = true; + } + // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself camelContext.addService(idempotentRepository, true); } @Override protected void doStop() throws Exception { - // noop + if (shutdownExecutorService && readLockIdempotentReleaseExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownGraceful(readLockIdempotentReleaseExecutorService, 30000); + readLockIdempotentReleaseExecutorService = null; + } } } diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index 28f1556..a471eb6 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -17,6 +17,8 @@ package org.apache.camel.component.file.strategy; import java.io.File; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -48,6 +50,11 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; private boolean removeOnCommit; + private int readLockIdempotentReleaseDelay; + private boolean readLockIdempotentReleaseAsync; + private int readLockIdempotentReleaseAsyncPoolSize; + private ScheduledExecutorService readLockIdempotentReleaseExecutorService; + private boolean shutdownExecutorService; @Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { @@ -81,22 +88,48 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp @Override public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - if (removeOnRollback) { - idempotentRepository.remove(key); + Runnable r = () -> { + if (removeOnRollback) { + idempotentRepository.remove(key); + } else { + // okay we should not remove then confirm it instead + idempotentRepository.confirm(key); + } + }; + + if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { + LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); + readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); + } else if (readLockIdempotentReleaseDelay > 0) { + LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + Thread.sleep(readLockIdempotentReleaseDelay); + r.run(); } else { - // okay we should not remove then confirm it instead - idempotentRepository.confirm(key); + r.run(); } } @Override public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - if (removeOnCommit) { - idempotentRepository.remove(key); + Runnable r = () -> { + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // confirm on commit + idempotentRepository.confirm(key); + } + }; + + if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { + LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); + readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); + } else if (readLockIdempotentReleaseDelay > 0) { + LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + Thread.sleep(readLockIdempotentReleaseDelay); + r.run(); } else { - // confirm on commit - idempotentRepository.confirm(key); + r.run(); } } @@ -178,6 +211,50 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp this.removeOnCommit = removeOnCommit; } + public int getReadLockIdempotentReleaseDelay() { + return readLockIdempotentReleaseDelay; + } + + /** + * Whether to delay the release task for a period of millis. + */ + public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) { + this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay; + } + + public boolean isReadLockIdempotentReleaseAsync() { + return readLockIdempotentReleaseAsync; + } + + /** + * Whether the delayed release task should be synchronous or asynchronous. + */ + public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) { + this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync; + } + + public int getReadLockIdempotentReleaseAsyncPoolSize() { + return readLockIdempotentReleaseAsyncPoolSize; + } + + /** + * The number of threads in the scheduled thread pool when using asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) { + this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize; + } + + public ScheduledExecutorService getReadLockIdempotentReleaseExecutorService() { + return readLockIdempotentReleaseExecutorService; + } + + /** + * To use a custom and shared thread pool for asynchronous release tasks. + */ + public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) { + this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; + } + protected String asKey(GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression key was configured String key = file.getAbsoluteFilePath(); @@ -193,13 +270,21 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp ObjectHelper.notNull(camelContext, "camelContext", this); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); + if (readLockIdempotentReleaseAsync && readLockIdempotentReleaseExecutorService == null) { + readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize); + shutdownExecutorService = true; + } + // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself camelContext.addService(idempotentRepository, true); } @Override protected void doStop() throws Exception { - // noop + if (shutdownExecutorService && readLockIdempotentReleaseExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownGraceful(readLockIdempotentReleaseExecutorService, 30000); + readLockIdempotentReleaseExecutorService = null; + } } } diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index 3d74ea2..710f4eb 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -18,6 +18,7 @@ package org.apache.camel.component.file.strategy; import java.io.File; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.CamelContext; import org.apache.camel.Expression; @@ -142,6 +143,22 @@ public final class FileProcessStrategyFactory { if (repo != null) { readLockStrategy.setIdempotentRepository(repo); } + Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); + if (readLockIdempotentReleaseDelay != null) { + readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); + } + Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); + if (readLockIdempotentReleaseAsync != null) { + readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); + } + Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); + if (readLockIdempotentReleaseAsyncPoolSize != null) { + readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); + } + ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); + if (readLockIdempotentReleaseExecutorService != null) { + readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); + } strategy = readLockStrategy; } else if ("idempotent-changed".equals(readLock)) { FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy = new FileIdempotentChangedRepositoryReadLockStrategy(); @@ -165,6 +182,22 @@ public final class FileProcessStrategyFactory { if (null != minAge) { readLockStrategy.setMinAge(minAge); } + Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); + if (readLockIdempotentReleaseDelay != null) { + readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); + } + Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); + if (readLockIdempotentReleaseAsync != null) { + readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); + } + Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); + if (readLockIdempotentReleaseAsyncPoolSize != null) { + readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); + } + ScheduledExecutorService readLockIdempotentReleaseExecutorService = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); + if (readLockIdempotentReleaseExecutorService != null) { + readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); + } strategy = readLockStrategy; } else if ("idempotent-rename".equals(readLock)) { FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRenameRepositoryReadLockStrategy(); -- To stop receiving notification emails like this one, please contact davscl...@apache.org.