This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7bf23493d8a0368549028888e75b2b2074c0e5ce Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Mar 29 13:17:06 2018 +0200 CAMEL-12382: FileConsumer - Allow to delay readLock release tasks on idempotent read-lock --- components/camel-ftp/src/main/docs/ftp-component.adoc | 2 +- components/camel-ftp/src/main/docs/ftps-component.adoc | 8 ++++++-- components/camel-ftp/src/main/docs/sftp-component.adoc | 8 ++++++-- .../java/org/apache/camel/component/file/remote/FtpConsumer.java | 5 +++-- .../java/org/apache/camel/component/file/remote/FtpEndpoint.java | 5 +++-- .../apache/camel/component/file/remote/RemoteFileConsumer.java | 5 +++-- .../java/org/apache/camel/component/file/remote/SftpConsumer.java | 5 +++-- .../java/org/apache/camel/component/file/remote/SftpEndpoint.java | 2 +- .../component/file/remote/RemoteFileIgnoreDoPollErrorTest.java | 2 +- 9 files changed, 27 insertions(+), 15 deletions(-) diff --git a/components/camel-ftp/src/main/docs/ftp-component.adoc b/components/camel-ftp/src/main/docs/ftp-component.adoc index 9821eca..de01e95 100644 --- a/components/camel-ftp/src/main/docs/ftp-component.adoc +++ b/components/camel-ftp/src/main/docs/ftp-component.adoc @@ -192,7 +192,7 @@ with the following path and query parameters: | *readLockMarkerFile* (lock) | Whether to use marker file with the changed, rename, or exclusive read lock types. By default a marker file is used as well to guard against other processes picking up the same files. This behavior can be turned off by setting this option to false. For example if you do not want to write marker files to the file systems by the Camel application. | true | boolean | *readLockMinAge* (lock) | This option applied only for readLock=change. This option allows to specify a minimum age the file must be before attempting to acquire the read lock. For example use readLockMinAge=300s to require the file is at last 5 minutes old. This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. | 0 | long | *readLockMinLength* (lock) | This option applied only for readLock=changed. This option allows you to configure a minimum file length. By default Camel expects the file to contain data, and thus the default value is 1. You can set this option to zero, to allow consuming zero-length files. | 1 | long -| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] +| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] | *readLockRemoveOnRollback* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file failed and a rollback happens. If this option is false, then the file name entry is confirmed (as if the file did a commit). | true | boolean | *readLockTimeout* (lock) | Optional timeout in millis for the read-lock, if supported by the read-lock. If the read-lock could not be granted and the timeout triggered, then Camel will skip the file. At next poll Camel, will try the file again, and this time maybe the read-lock could be granted. Use a value of 0 or lower to indicate forever. Currently fileLock, changed and rename support the timeout. Notice: For FTP the default readLockTimeout value is 20000 instead of 10000. The readL [...] | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int diff --git a/components/camel-ftp/src/main/docs/ftps-component.adoc b/components/camel-ftp/src/main/docs/ftps-component.adoc index 106fd53..ae85f28 100644 --- a/components/camel-ftp/src/main/docs/ftps-component.adoc +++ b/components/camel-ftp/src/main/docs/ftps-component.adoc @@ -60,7 +60,7 @@ with the following path and query parameters: |=== -==== Query Parameters (116 parameters): +==== Query Parameters (120 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -148,11 +148,15 @@ with the following path and query parameters: | *readLock* (lock) | Used by consumer, to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to det [...] | *readLockCheckInterval* (lock) | Interval in millis for the read-lock, if supported by the read lock. This interval is used for sleeping between attempts to acquire the read lock. For example when using the changed read lock, you can set a higher interval period to cater for slow writes. The default of 1 sec. may be too fast if the producer is very slow writing the file. Notice: For FTP the default readLockCheckInterval is 5000. The readLockTimeout value must be higher than readLockChe [...] | *readLockDeleteOrphanLock Files* (lock) | Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may have been left on the file system, if Camel was not properly shutdown (such as a JVM crash). If turning this option to false then any orphaned lock file will cause Camel to not attempt to pickup that file, this could also be due another node is concurrently reading files from the same shared directory. | true | boolean +| *readLockIdempotentRelease Async* (lock) | Whether the delayed release task should be synchronous or asynchronous. See more details at the readLockIdempotentReleaseDelay option. | false | boolean +| *readLockIdempotentRelease AsyncPoolSize* (lock) | The number of threads in the scheduled thread pool when using asynchronous release tasks. Using a default of 1 core threads should be sufficient in almost all use-cases, only set this to a higher value if either updating the idempotent repository is slow, or there are a lot of files to process. This option is not in-use if you use a shared thread pool by configuring the readLockIdempotentReleaseExecutorService option. See more details [...] +| *readLockIdempotentRelease Delay* (lock) | Whether to delay the release task for a period of millis. This can be used to delay the release tasks to expand the window when a file is regarded as read-locked, in an active/active cluster scenario with a shared idempotent repository, to ensure other nodes cannot potentially scan and acquire the same file, due to race-conditions. By expanding the time-window of the release tasks helps prevents these situations. Note delaying is only needed i [...] +| *readLockIdempotentRelease ExecutorService* (lock) | To use a custom and shared thread pool for asynchronous release tasks. See more details at the readLockIdempotentReleaseDelay option. | | ScheduledExecutor Service | *readLockLoggingLevel* (lock) | Logging level used when a read lock could not be acquired. By default a WARN is logged. You can change this level, for example to OFF to not have any logging. This option is only applicable for readLock of types: changed, fileLock, idempotent, idempotent-changed, idempotent-rename, rename. | DEBUG | LoggingLevel | *readLockMarkerFile* (lock) | Whether to use marker file with the changed, rename, or exclusive read lock types. By default a marker file is used as well to guard against other processes picking up the same files. This behavior can be turned off by setting this option to false. For example if you do not want to write marker files to the file systems by the Camel application. | true | boolean | *readLockMinAge* (lock) | This option applied only for readLock=change. This option allows to specify a minimum age the file must be before attempting to acquire the read lock. For example use readLockMinAge=300s to require the file is at last 5 minutes old. This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. | 0 | long | *readLockMinLength* (lock) | This option applied only for readLock=changed. This option allows you to configure a minimum file length. By default Camel expects the file to contain data, and thus the default value is 1. You can set this option to zero, to allow consuming zero-length files. | 1 | long -| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] +| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] | *readLockRemoveOnRollback* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file failed and a rollback happens. If this option is false, then the file name entry is confirmed (as if the file did a commit). | true | boolean | *readLockTimeout* (lock) | Optional timeout in millis for the read-lock, if supported by the read-lock. If the read-lock could not be granted and the timeout triggered, then Camel will skip the file. At next poll Camel, will try the file again, and this time maybe the read-lock could be granted. Use a value of 0 or lower to indicate forever. Currently fileLock, changed and rename support the timeout. Notice: For FTP the default readLockTimeout value is 20000 instead of 10000. The readL [...] | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int diff --git a/components/camel-ftp/src/main/docs/sftp-component.adoc b/components/camel-ftp/src/main/docs/sftp-component.adoc index 9743837..c355bd3 100644 --- a/components/camel-ftp/src/main/docs/sftp-component.adoc +++ b/components/camel-ftp/src/main/docs/sftp-component.adoc @@ -51,7 +51,7 @@ with the following path and query parameters: |=== -==== Query Parameters (111 parameters): +==== Query Parameters (115 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -131,11 +131,15 @@ with the following path and query parameters: | *readLock* (lock) | Used by consumer, to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to det [...] | *readLockCheckInterval* (lock) | Interval in millis for the read-lock, if supported by the read lock. This interval is used for sleeping between attempts to acquire the read lock. For example when using the changed read lock, you can set a higher interval period to cater for slow writes. The default of 1 sec. may be too fast if the producer is very slow writing the file. Notice: For FTP the default readLockCheckInterval is 5000. The readLockTimeout value must be higher than readLockChe [...] | *readLockDeleteOrphanLock Files* (lock) | Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may have been left on the file system, if Camel was not properly shutdown (such as a JVM crash). If turning this option to false then any orphaned lock file will cause Camel to not attempt to pickup that file, this could also be due another node is concurrently reading files from the same shared directory. | true | boolean +| *readLockIdempotentRelease Async* (lock) | Whether the delayed release task should be synchronous or asynchronous. See more details at the readLockIdempotentReleaseDelay option. | false | boolean +| *readLockIdempotentRelease AsyncPoolSize* (lock) | The number of threads in the scheduled thread pool when using asynchronous release tasks. Using a default of 1 core threads should be sufficient in almost all use-cases, only set this to a higher value if either updating the idempotent repository is slow, or there are a lot of files to process. This option is not in-use if you use a shared thread pool by configuring the readLockIdempotentReleaseExecutorService option. See more details [...] +| *readLockIdempotentRelease Delay* (lock) | Whether to delay the release task for a period of millis. This can be used to delay the release tasks to expand the window when a file is regarded as read-locked, in an active/active cluster scenario with a shared idempotent repository, to ensure other nodes cannot potentially scan and acquire the same file, due to race-conditions. By expanding the time-window of the release tasks helps prevents these situations. Note delaying is only needed i [...] +| *readLockIdempotentRelease ExecutorService* (lock) | To use a custom and shared thread pool for asynchronous release tasks. See more details at the readLockIdempotentReleaseDelay option. | | ScheduledExecutor Service | *readLockLoggingLevel* (lock) | Logging level used when a read lock could not be acquired. By default a WARN is logged. You can change this level, for example to OFF to not have any logging. This option is only applicable for readLock of types: changed, fileLock, idempotent, idempotent-changed, idempotent-rename, rename. | DEBUG | LoggingLevel | *readLockMarkerFile* (lock) | Whether to use marker file with the changed, rename, or exclusive read lock types. By default a marker file is used as well to guard against other processes picking up the same files. This behavior can be turned off by setting this option to false. For example if you do not want to write marker files to the file systems by the Camel application. | true | boolean | *readLockMinAge* (lock) | This option applied only for readLock=change. This option allows to specify a minimum age the file must be before attempting to acquire the read lock. For example use readLockMinAge=300s to require the file is at last 5 minutes old. This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. | 0 | long | *readLockMinLength* (lock) | This option applied only for readLock=changed. This option allows you to configure a minimum file length. By default Camel expects the file to contain data, and thus the default value is 1. You can set this option to zero, to allow consuming zero-length files. | 1 | long -| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] +| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...] | *readLockRemoveOnRollback* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file failed and a rollback happens. If this option is false, then the file name entry is confirmed (as if the file did a commit). | true | boolean | *readLockTimeout* (lock) | Optional timeout in millis for the read-lock, if supported by the read-lock. If the read-lock could not be granted and the timeout triggered, then Camel will skip the file. At next poll Camel, will try the file again, and this time maybe the read-lock could be granted. Use a value of 0 or lower to indicate forever. Currently fileLock, changed and rename support the timeout. Notice: For FTP the default readLockTimeout value is 20000 instead of 10000. The readL [...] | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java index d9e9d85..2d120ba 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java @@ -28,6 +28,7 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.component.file.GenericFileProcessStrategy; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -46,8 +47,8 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { private transient String ftpConsumerToString; - public FtpConsumer(RemoteFileEndpoint<FTPFile> endpoint, Processor processor, RemoteFileOperations<FTPFile> fileOperations) { - super(endpoint, processor, fileOperations); + public FtpConsumer(RemoteFileEndpoint<FTPFile> endpoint, Processor processor, RemoteFileOperations<FTPFile> fileOperations, GenericFileProcessStrategy processStrategy) { + super(endpoint, processor, fileOperations, processStrategy); this.endpointPath = endpoint.getConfiguration().getDirectory(); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java index 2bce128..2b85ef9 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java @@ -42,7 +42,8 @@ import org.apache.commons.net.ftp.FTPFile; */ @UriEndpoint(firstVersion = "1.1.0", scheme = "ftp", extendsScheme = "file", title = "FTP", syntax = "ftp:host:port/directoryName", alternativeSyntax = "ftp:username:password@host:port/directoryName", - consumerClass = FtpConsumer.class, label = "file") + consumerClass = FtpConsumer.class, label = "file", + excludeProperties = "readLockIdempotentReleaseAsync,readLockIdempotentReleaseAsyncPoolSize,readLockIdempotentReleaseDelay,readLockIdempotentReleaseExecutorService") @ManagedResource(description = "Managed FtpEndpoint") public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile> { protected int soTimeout; @@ -94,7 +95,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile> @Override protected RemoteFileConsumer<FTPFile> buildConsumer(Processor processor) { try { - return new FtpConsumer(this, processor, createRemoteFileOperations()); + return new FtpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy()); } catch (Exception e) { throw new FailedToCreateConsumerException(this, e); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index a676732..b1ae6d7 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -25,6 +25,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileConsumer; import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.component.file.GenericFileProcessStrategy; import org.apache.camel.support.SynchronizationAdapter; /** @@ -34,8 +35,8 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { protected transient boolean loggedIn; protected transient boolean loggedInWarning; - public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor, RemoteFileOperations<T> operations) { - super(endpoint, processor, operations); + public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor, RemoteFileOperations<T> operations, GenericFileProcessStrategy processStrategy) { + super(endpoint, processor, operations, processStrategy); this.setPollStrategy(new RemoteFilePollingConsumerPollStrategy()); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java index fc8fc36..026b37b 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java @@ -28,6 +28,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.component.file.GenericFileProcessStrategy; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -42,8 +43,8 @@ public class SftpConsumer extends RemoteFileConsumer<SftpRemoteFile> { private transient String sftpConsumerToString; - public SftpConsumer(RemoteFileEndpoint<SftpRemoteFile> endpoint, Processor processor, RemoteFileOperations<SftpRemoteFile> operations) { - super(endpoint, processor, operations); + public SftpConsumer(RemoteFileEndpoint<SftpRemoteFile> endpoint, Processor processor, RemoteFileOperations<SftpRemoteFile> operations, GenericFileProcessStrategy<SftpRemoteFile> processStrategy) { + super(endpoint, processor, operations, processStrategy); this.endpointPath = endpoint.getConfiguration().getDirectory(); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java index 42341bc..62b46fd 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java @@ -61,7 +61,7 @@ public class SftpEndpoint extends RemoteFileEndpoint<SftpRemoteFile> { @Override protected RemoteFileConsumer<SftpRemoteFile> buildConsumer(Processor processor) { - return new SftpConsumer(this, processor, createRemoteFileOperations()); + return new SftpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy()); } protected GenericFileProducer<SftpRemoteFile> buildProducer() { diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java index b225fa9..3db4d8a 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/RemoteFileIgnoreDoPollErrorTest.java @@ -88,7 +88,7 @@ public class RemoteFileIgnoreDoPollErrorTest { } private RemoteFileConsumer<Object> getRemoteFileConsumer(final String doPollResult, final boolean ignoreCannotRetrieveFile) { - return new RemoteFileConsumer<Object>(remoteFileEndpoint, null, null) { + return new RemoteFileConsumer<Object>(remoteFileEndpoint, null, null, null) { @Override protected boolean doPollDirectory(String absolutePath, String dirName, List<GenericFile<Object>> genericFiles, int depth) { if ("IllegalStateException".equals(doPollResult)) { -- To stop receiving notification emails like this one, please contact davscl...@apache.org.