This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5c4b7e8bf2e5676c3bda5e1aca3b8a7d07371fae Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Mar 31 07:46:02 2021 +0200 Polished --- .../apache/camel/catalog/docs/sql-component.adoc | 19 ++++++++++++++++++ .../JdbcOrphanLockAwareIdempotentRepository.java | 23 +++++++++++----------- .../modules/ROOT/pages/sql-component.adoc | 19 ++++++++++++++++++ 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc index a42a90e..af5a795 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc @@ -574,6 +574,25 @@ the second one is the message id (`String`). The option `tableName` can be used to use the default SQL queries but with a different table name. However if you want to customize the SQL queries then you can configure each of them individually. +=== Orphan Lock aware Jdbc IdempotentRepository + +One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use +`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`. This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : + +* lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. + +* lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock + +* lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism + +This repository has two additional configuration parameters + +[cols="1,1"] +|=== +|Parameter | Description +|lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned. +|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. +|=== == Using the JDBC based aggregation repository diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java index a787e5a..a84cbff 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java @@ -45,8 +45,6 @@ import org.springframework.transaction.support.TransactionTemplate; * A lock is granted to an instance if either the entry for the lock attributes do not exists in the * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock has crashed. This is determined if the * timestamp on the createdAt column is more than the lockMaxAge. - * - * * */ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdRepository implements ShutdownableService { @@ -144,16 +142,22 @@ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdReposi updateTimestampQuery = updateTimestampQuery.replaceFirst(DEFAULT_TABLENAME, getTableName()); } executorServiceManager = context.getExecutorServiceManager(); - executorService = executorServiceManager.newSingleThreadScheduledExecutor(this, this.getClass().getName()); - /** - * Schedule a task which will keep updating the timestamp on the acquired locks at lockKeepAliveInterval so that - * the timestamp does not reaches lockMaxAge - */ + executorService = executorServiceManager.newSingleThreadScheduledExecutor(this, this.getClass().getSimpleName()); + + // Schedule a task which will keep updating the timestamp on the acquired locks at lockKeepAliveInterval so that + // the timestamp does not reaches lockMaxAge executorService.scheduleWithFixedDelay(new LockKeepAliveTask(), lockKeepAliveIntervalMillis, lockKeepAliveIntervalMillis, TimeUnit.MILLISECONDS); } @Override + protected void doShutdown() throws Exception { + if (executorServiceManager != null && executorService != null) { + executorServiceManager.shutdownGraceful(executorService); + } + } + + @Override protected int delete() { long stamp = sl.writeLock(); try { @@ -182,11 +186,6 @@ public class JdbcOrphanLockAwareIdempotentRepository extends JdbcMessageIdReposi } } - @Override - public void shutdown() { - executorServiceManager.shutdownGraceful(executorService); - } - public Set<ProcessorNameAndMessageId> getProcessorNameMessageIdSet() { return processorNameMessageIdSet; } diff --git a/docs/components/modules/ROOT/pages/sql-component.adoc b/docs/components/modules/ROOT/pages/sql-component.adoc index 9a187de..9263b64 100644 --- a/docs/components/modules/ROOT/pages/sql-component.adoc +++ b/docs/components/modules/ROOT/pages/sql-component.adoc @@ -576,6 +576,25 @@ the second one is the message id (`String`). The option `tableName` can be used to use the default SQL queries but with a different table name. However if you want to customize the SQL queries then you can configure each of them individually. +=== Orphan Lock aware Jdbc IdempotentRepository + +One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use +`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`. This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : + +* lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. + +* lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock + +* lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism + +This repository has two additional configuration parameters + +[cols="1,1"] +|=== +|Parameter | Description +|lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned. +|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. +|=== == Using the JDBC based aggregation repository