This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c87b3f8  Added new Jdbc based IdempotentRepository which addresses the 
problem of orphan locks (#5278)
c87b3f8 is described below

commit c87b3f83d1808cf3503effbc5b4c8e08c63e22be
Author: Samrat Dhillon <samrat.dhil...@gmail.com>
AuthorDate: Wed Mar 31 01:42:19 2021 -0400

    Added new Jdbc based IdempotentRepository which addresses the problem of 
orphan locks (#5278)
    
    * Added new Jdbc based IdempotentRepository which addresses the problem of 
orhpan lock that can be left behind by jvm crashes
    
    * making default table name protected so that it can be acessed by other 
implementations
    
    * Renaming the class. Using Camel ExecutorServiceManager to schedule the 
keep alive and added documentation
    
    Co-authored-by: Samrat Dhillon <samrat.dhil...@innovapost.com>
---
 .../camel-sql/src/main/docs/sql-component.adoc     |  19 ++
 .../idempotent/jdbc/JdbcMessageIdRepository.java   |   3 +-
 .../JdbcOrphanLockAwareIdempotentRepository.java   | 283 +++++++++++++++++++++
 ...dbcOrphanLockAwareIdempotentRepositoryTest.java | 103 ++++++++
 .../sql/idempotentWithOrphanLockRemoval.sql        |  14 +
 5 files changed, 421 insertions(+), 1 deletion(-)

diff --git a/components/camel-sql/src/main/docs/sql-component.adoc 
b/components/camel-sql/src/main/docs/sql-component.adoc
index a42a90e..af5a795 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -574,6 +574,25 @@ the second one is the message id (`String`).
 The option `tableName` can be used to use the default SQL queries but with a 
different table name.
 However if you want to customize the SQL queries then you can configure each 
of them individually.
 
+=== Orphan Lock aware Jdbc IdempotentRepository 
+
+One of the limitations of 
`org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it 
does not handle orphan locks resulting from JVM crash or non graceful shutdown. 
This can result in unprocessed files/messages if this is implementation is used 
with camel-file, camel-ftp etc. if you need to address orphan locks processing 
then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.
  This repository keeps track of the locks held by an instance of the 
application. For each lock held, the application will send keep alive signals 
to the lock repository resulting in updating the createdAt column with the 
current Timestamp. When an application instance tries to acquire a lock if the, 
then there are three possibilities exist : 
+
+* lock entry does not exist then the lock is provided using the base 
implementation of `JdbcMessageIdRepository`. 
+
+* lock already exists and the createdAt < System.currentTimeMillis() - 
lockMaxAgeMillis. In this case it is assumed that an active instance has the 
lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() - 
lockMaxAgeMillis. In this case it is assumed that there is no active instance 
which has the lock and the lock is provided to the requesting instance. The 
reason behind is that if the original instance which had the lock, if it was 
still running, it would have updated the Timestamp on createdAt using its 
keepAlive mechanism
+
+This repository has two additional configuration parameters 
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is 
considered orphaned i.e. if the currentTimestamp - createdAt >= 
lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are 
done to createdAt Timestamp column.
+|===
 
 == Using the JDBC based aggregation repository
 
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
index a0e9db2..225a5a3 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
@@ -31,7 +31,7 @@ import 
org.springframework.transaction.support.TransactionTemplate;
  */
 public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository {
 
-    private static final String DEFAULT_TABLENAME = "CAMEL_MESSAGEPROCESSED";
+    protected static final String DEFAULT_TABLENAME = "CAMEL_MESSAGEPROCESSED";
 
     private boolean createTableIfNotExists = true;
     private String tableName;
@@ -79,6 +79,7 @@ public class JdbcMessageIdRepository extends 
AbstractJdbcMessageIdRepository {
         super.doStart();
 
         transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            @Override
             public Boolean doInTransaction(TransactionStatus status) {
                 try {
                     // we will receive an exception if the table doesn't 
exists or we cannot access it
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
new file mode 100644
index 0000000..a787e5a
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Collectors;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ShutdownableService;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.springframework.dao.DuplicateKeyException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Implementation of {@link AbstractJdbcMessageIdRepository} which handles 
orphan locks resulting from jvm crash.
+ *
+ * When an instance of the application acquires a lock on the idempotent 
repository, the lock attributes are added to a
+ * HashSet. While the lock is help by the instance, the instance keeps 
updating the createdAt column with the current
+ * timestamp indicating the instance holding the lock is active.
+ *
+ * A lock is granted to an instance if either the entry for the lock 
attributes do not exists in the
+ * CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock 
has crashed. This is determined if the
+ * timestamp on the createdAt column is more than the lockMaxAge.
+ *
+ * *
+ */
+public class JdbcOrphanLockAwareIdempotentRepository extends 
JdbcMessageIdRepository implements ShutdownableService {
+
+    private final StampedLock sl = new StampedLock();
+
+    private final Set<ProcessorNameAndMessageId> processorNameMessageIdSet = 
new HashSet<>();
+
+    private ExecutorServiceManager executorServiceManager;
+
+    private ScheduledExecutorService executorService;
+
+    private CamelContext context;
+
+    /** Max age of read lock in milliseconds **/
+    private long lockMaxAgeMillis;
+
+    /** intervals after which keep alive is sent for the locks held by an 
instance **/
+    private long lockKeepAliveIntervalMillis;
+
+    private String updateTimestampQuery
+            = "UPDATE CAMEL_MESSAGEPROCESSED SET createdAt =? WHERE 
processorName =? AND messageId = ?";
+
+    public JdbcOrphanLockAwareIdempotentRepository(CamelContext camelContext) {
+        super();
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(DataSource dataSource, 
String processorName, CamelContext camelContext) {
+        super(dataSource, processorName);
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(DataSource dataSource, 
TransactionTemplate transactionTemplate,
+                                                   String processorName, 
CamelContext camelContext) {
+        super(dataSource, transactionTemplate, processorName);
+        this.context = camelContext;
+    }
+
+    public JdbcOrphanLockAwareIdempotentRepository(JdbcTemplate jdbcTemplate,
+                                                   TransactionTemplate 
transactionTemplate, CamelContext camelContext) {
+        super(jdbcTemplate, transactionTemplate);
+        this.context = camelContext;
+    }
+
+    @Override
+    protected int queryForInt(String key) {
+        /**
+         * If the update timestamp time is more than lockMaxAge then assume 
that the lock is orphan and the process
+         * which had acquired the lock has died
+         */
+        String orphanLockRecoverQueryString = getQueryString() + " AND 
createdAt >= ?";
+        Timestamp xMillisAgo = new Timestamp(System.currentTimeMillis() - 
lockMaxAgeMillis);
+        return jdbcTemplate.queryForObject(orphanLockRecoverQueryString, 
Integer.class, processorName, key,
+                xMillisAgo);
+    }
+
+    @Override
+    protected int delete(String key) {
+        long stamp = sl.writeLock();
+        try {
+            int result = super.delete(key);
+            processorNameMessageIdSet.remove(new 
ProcessorNameAndMessageId(processorName, key));
+            return result;
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+
+    }
+
+    @Override
+    protected int insert(String key) {
+        Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis());
+        long stamp = sl.writeLock();
+        try {
+            int result = jdbcTemplate.update(getInsertString(), processorName, 
key, currentTimestamp);
+            processorNameMessageIdSet.add(new 
ProcessorNameAndMessageId(processorName, key));
+            return result;
+        } catch (DuplicateKeyException e) {
+            //Update in case of orphan lock where a process dies without 
releasing exist lock
+            return jdbcTemplate.update(getUpdateTimestampQuery(), 
currentTimestamp, processorName, key);
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        if (lockMaxAgeMillis <= lockKeepAliveIntervalMillis) {
+            throw new IllegalStateException("value of lockMaxAgeMillis cannot 
be <= lockKeepAliveIntervalMillis");
+        }
+        Objects.requireNonNull(this.context, () -> "context cannot be null");
+
+        super.doInit();
+        if (getTableName() != null) {
+            updateTimestampQuery = 
updateTimestampQuery.replaceFirst(DEFAULT_TABLENAME, getTableName());
+        }
+        executorServiceManager = context.getExecutorServiceManager();
+        executorService = 
executorServiceManager.newSingleThreadScheduledExecutor(this, 
this.getClass().getName());
+        /**
+         * Schedule a task which will keep updating the timestamp on the 
acquired locks at lockKeepAliveInterval so that
+         * the timestamp does not reaches lockMaxAge
+         */
+        executorService.scheduleWithFixedDelay(new LockKeepAliveTask(), 
lockKeepAliveIntervalMillis,
+                lockKeepAliveIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected int delete() {
+        long stamp = sl.writeLock();
+        try {
+            int result = super.delete();
+            processorNameMessageIdSet.clear();
+            return result;
+        } finally {
+            sl.unlockWrite(stamp);
+        }
+
+    }
+
+    void keepAlive() {
+        Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis());
+        long stamp = sl.readLock();
+        try {
+            List<Object[]> args = processorNameMessageIdSet.stream()
+                    .map(processorNameMessageId -> new Object[] {
+                            currentTimestamp, 
processorNameMessageId.processorName, processorNameMessageId.messageId })
+                    .collect(Collectors.toList());
+            transactionTemplate.execute(status -> 
jdbcTemplate.batchUpdate(getUpdateTimestampQuery(), args));
+        } catch (Exception e) {
+            log.error("failed updating createdAt in keepAlive due to ", e);
+        } finally {
+            sl.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        executorServiceManager.shutdownGraceful(executorService);
+    }
+
+    public Set<ProcessorNameAndMessageId> getProcessorNameMessageIdSet() {
+        return processorNameMessageIdSet;
+    }
+
+    public String getUpdateTimestampQuery() {
+        return updateTimestampQuery;
+    }
+
+    public void setUpdateTimestampQuery(String updateTimestampQuery) {
+        this.updateTimestampQuery = updateTimestampQuery;
+    }
+
+    public long getLockMaxAgeMillis() {
+        return lockMaxAgeMillis;
+    }
+
+    public void setLockMaxAgeMillis(long lockMaxAgeMillis) {
+        this.lockMaxAgeMillis = lockMaxAgeMillis;
+    }
+
+    public long getLockKeepAliveIntervalMillis() {
+        return lockKeepAliveIntervalMillis;
+    }
+
+    public void setLockKeepAliveIntervalMillis(long 
lockKeepAliveIntervalMillis) {
+        this.lockKeepAliveIntervalMillis = lockKeepAliveIntervalMillis;
+    }
+
+    class LockKeepAliveTask implements Runnable {
+
+        @Override
+        public void run() {
+            keepAlive();
+        }
+    }
+
+    static class ProcessorNameAndMessageId {
+        private final String processorName;
+        private final String messageId;
+
+        public ProcessorNameAndMessageId(String processorName, String 
messageId) {
+            this.processorName = processorName;
+            this.messageId = messageId;
+        }
+
+        public String getProcessorName() {
+            return processorName;
+        }
+
+        public String getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((messageId == null) ? 0 : 
messageId.hashCode());
+            result = prime * result + ((processorName == null) ? 0 : 
processorName.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            ProcessorNameAndMessageId other = (ProcessorNameAndMessageId) obj;
+            if (messageId == null) {
+                if (other.messageId != null) {
+                    return false;
+                }
+            } else if (!messageId.equals(other.messageId)) {
+                return false;
+            }
+            if (processorName == null) {
+                if (other.processorName != null) {
+                    return false;
+                }
+            } else if (!processorName.equals(other.processorName)) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
new file mode 100644
index 0000000..5a9479e
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import 
org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository.ProcessorNameAndMessageId;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class JdbcOrphanLockAwareIdempotentRepositoryTest {
+
+    private static final String APP_NAME = "APP_1";
+
+    private EmbeddedDatabase dataSource;
+
+    private JdbcOrphanLockAwareIdempotentRepository jdbcMessageIdRepository;
+
+    @BeforeAll
+    public void setup() throws Exception {
+        dataSource = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.HSQL)
+                .addScript("classpath:sql/idempotentWithOrphanLockRemoval.sql")
+                .generateUniqueName(true)
+                .build();
+        jdbcMessageIdRepository = new 
JdbcOrphanLockAwareIdempotentRepository(dataSource, APP_NAME, new 
DefaultCamelContext());
+        jdbcMessageIdRepository.setLockMaxAgeMillis(3000_00L);
+        jdbcMessageIdRepository.setLockKeepAliveIntervalMillis(3000L);
+        jdbcMessageIdRepository.doInit();
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStamp() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_1"));
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStampPlus2Min() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_2"));
+    }
+
+    @Test
+    public void testLockGrantedForCurrentTimeStampPlus5Min() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_3"));
+    }
+
+    @Test
+    public void testLockKeepAliveWorks() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.insert("FILE_4");
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+        JdbcTemplate template = new JdbcTemplate(dataSource);
+        Timestamp timestamp = new Timestamp(System.currentTimeMillis() - 5 * 
60 * 1000L);
+        template.update("UPDATE CAMEL_MESSAGEPROCESSED SET createdAT = ? WHERE 
processorName = ? AND messageId = ?", timestamp,
+                APP_NAME, "FILE_4");
+        assertFalse(jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.keepAlive();
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+    }
+
+    @Test
+    public void testInsertQueryDelete() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+
+        jdbcMessageIdRepository.add("FILE_5");
+
+        assertTrue(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+        assertTrue(jdbcMessageIdRepository.contains("FILE_5"));
+        jdbcMessageIdRepository.remove("FILE_5");
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+    }
+
+}
diff --git 
a/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
 
b/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
new file mode 100644
index 0000000..ce108eb
--- /dev/null
+++ 
b/components/camel-sql/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
@@ -0,0 +1,14 @@
+-- Add DDL to create tables, views, indexes, etc needed by tests. These should 
match the expected database structure as it will appear in production.
+SET DATABASE SQL SYNTAX PGS TRUE; -- tells HSQLDB that this schema uses MYSQL 
syntax
+SET PROPERTY "sql.enforce_strict_size" FALSE;
+
+CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId 
VARCHAR(100), createdAt TIMESTAMP);
+
+ALTER TABLE CAMEL_MESSAGEPROCESSED ADD PRIMARY KEY (processorName, messageId);
+
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_1', 
CURRENT_TIMESTAMP);
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 
'FILE_2',TIMESTAMPADD(SQL_TSI_MINUTE, -2, CURRENT_TIMESTAMP));
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 
'FILE_3',TIMESTAMPADD(SQL_TSI_MINUTE, -5, CURRENT_TIMESTAMP));

Reply via email to