This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new bf457b1  [CAMEL-16770] Added JDBC Idempotent Repository caching (#5767)
bf457b1 is described below

commit bf457b135b32a446dbabbd83146ce8127283582c
Author: Michael Genereux <mgene...@gmail.com>
AuthorDate: Mon Jul 5 09:29:30 2021 -0700

    [CAMEL-16770] Added JDBC Idempotent Repository caching (#5767)
    
    * Added JDBC Idempotent Repository caching
    
    * Temp commit to see if CI build passes
    
    * Added documentation and warning about caching
    
    * Leave casting of count to JDBC implementation
    
    * Cleaned up lambda formatting
---
 .../apache/camel/catalog/docs/sql-component.adoc   |  12 ++
 .../camel-sql/src/main/docs/sql-component.adoc     |  12 ++
 .../jdbc/JdbcCachedMessageIdRepository.java        | 142 +++++++++++++++++++++
 .../jdbc/JdbcCachedMessageIdRepositoryTest.java    |  85 ++++++++++++
 .../processor/idempotent/jdbc/cached-spring.xml    |  53 ++++++++
 .../modules/ROOT/pages/sql-component.adoc          |  12 ++
 6 files changed, 316 insertions(+)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
index 58bbb1e..a9eddc4 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
@@ -593,6 +593,18 @@ This repository has two additional configuration parameters
 |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are 
done to createdAt Timestamp column.
 |===
 
+=== Caching Jdbc IdempotentRepository 
+
+Some SQL implementations are not fast on a per query basis.  The 
+`JdbcMessageIdRepository` implementation does its idempotent checks
+individually within SQL transactions.  Checking a mere 100 keys can 
+take minutes.  The `JdbcCachedMessageIdRepository` preloads an in-memory 
+cache on start with the entire list of keys.  This cache is then 
+checked first before passing through to the original implementation.
+
+As with all cache implementations, there are considerations that should
+be made with regard to stale data and your specific usage.
+
 == Using the JDBC based aggregation repository
 
 *Since Camel 2.6*
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc 
b/components/camel-sql/src/main/docs/sql-component.adoc
index 58bbb1e..a9eddc4 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -593,6 +593,18 @@ This repository has two additional configuration parameters
 |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are 
done to createdAt Timestamp column.
 |===
 
+=== Caching Jdbc IdempotentRepository 
+
+Some SQL implementations are not fast on a per query basis.  The 
+`JdbcMessageIdRepository` implementation does its idempotent checks
+individually within SQL transactions.  Checking a mere 100 keys can 
+take minutes.  The `JdbcCachedMessageIdRepository` preloads an in-memory 
+cache on start with the entire list of keys.  This cache is then 
+checked first before passing through to the original implementation.
+
+As with all cache implementations, there are considerations that should
+be made with regard to stale data and your specific usage.
+
 == Using the JDBC based aggregation repository
 
 *Since Camel 2.6*
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java
new file mode 100644
index 0000000..d720cb9
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Caching version of {@link JdbcMessageIdRepository}
+ */
+public class JdbcCachedMessageIdRepository extends JdbcMessageIdRepository {
+    private Map<String, Integer> cache = new HashMap<>();
+    private int hitCount;
+    private int missCount;
+    private String queryAllString
+            = "SELECT messageId, COUNT(*) AS messageCount FROM 
CAMEL_MESSAGEPROCESSED WHERE processorName = ? GROUP BY messageId";
+
+    public JdbcCachedMessageIdRepository() {
+    }
+
+    public JdbcCachedMessageIdRepository(DataSource dataSource, String 
processorName) {
+        super(dataSource, processorName);
+    }
+
+    public JdbcCachedMessageIdRepository(DataSource dataSource, 
TransactionTemplate transactionTemplate, String processorName) {
+        super(dataSource, transactionTemplate, processorName);
+    }
+
+    public JdbcCachedMessageIdRepository(JdbcTemplate jdbcTemplate, 
TransactionTemplate transactionTemplate) {
+        super(jdbcTemplate, transactionTemplate);
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        if (getTableName() != null) {
+            queryAllString = queryAllString.replaceFirst(DEFAULT_TABLENAME, 
getTableName());
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        reload();
+    }
+
+    @Override
+    public boolean add(final String key) {
+        Integer previousValue = cache.getOrDefault(key, 0);
+        cache.put(key, previousValue + 1);
+        if (previousValue != 0) {
+            hitCount++;
+            return false;
+        }
+        missCount++;
+        return super.add(key);
+    }
+
+    @Override
+    public boolean contains(final String key) {
+        Integer previousValue = cache.getOrDefault(key, 0);
+        if (previousValue != 0) {
+            hitCount++;
+            return true;
+        }
+        missCount++;
+        return super.contains(key);
+    }
+
+    @Override
+    public boolean remove(String key) {
+        cache.remove(key);
+        return super.remove(key);
+    }
+
+    @Override
+    public void clear() {
+        cache.clear();
+        hitCount = 0;
+        missCount = 0;
+        super.clear();
+    }
+
+    public String getQueryAllString() {
+        return queryAllString;
+    }
+
+    public void setQueryAllString(String queryAllString) {
+        this.queryAllString = queryAllString;
+    }
+
+    public int getHitCount() {
+        return hitCount;
+    }
+
+    public int getMissCount() {
+        return missCount;
+    }
+
+    public void reload() {
+        transactionTemplate.execute(status -> {
+            try {
+                cache = jdbcTemplate.query(getQueryAllString(), resultSet -> {
+                    Map<String, Integer> messageIdCount = new HashMap<>();
+                    while (resultSet.next()) {
+                        messageIdCount.put(resultSet.getString("messageId"), 
resultSet.getInt("messageCount"));
+                    }
+                    return messageIdCount;
+                }, getProcessorName());
+                log.info("JdbcCachedMessageIdRepository cache loaded with {} 
entries", cache.size());
+            } catch (DataAccessException dae) {
+                log.error(
+                        "Unable to populate JdbcCachedMessageIdRepository 
cache because of: {}.",
+                        dae.getMessage());
+                throw dae;
+            }
+            return Boolean.TRUE;
+
+        });
+    }
+
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java
new file mode 100644
index 0000000..77ef966
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.spring.junit5.CamelSpringTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JdbcCachedMessageIdRepositoryTest extends CamelSpringTestSupport {
+
+    protected static final String INSERT_STRING
+            = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, 
createdAt) VALUES (?, ?, ?)";
+    protected static final String PROCESSOR_NAME = "myProcessorName";
+
+    protected JdbcTemplate jdbcTemplate;
+    protected DataSource dataSource;
+    protected JdbcCachedMessageIdRepository repository;
+
+    @EndpointInject("mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:error")
+    protected MockEndpoint errorEndpoint;
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+
+        dataSource = context.getRegistry().lookupByNameAndType("dataSource", 
DataSource.class);
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "1", new 
Timestamp(System.currentTimeMillis()));
+        jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "2", new 
Timestamp(System.currentTimeMillis()));
+        repository = 
context.getRegistry().lookupByNameAndType("messageIdRepository", 
JdbcCachedMessageIdRepository.class);
+        repository.reload();
+    }
+
+    @Test
+    public void testCacheHit() throws Exception {
+        resultEndpoint.expectedBodiesReceived("three");
+        errorEndpoint.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(5, repository.getHitCount());
+        assertEquals(1, repository.getMissCount());
+    }
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/cached-spring.xml");
+    }
+}
diff --git 
a/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml
 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml
new file mode 100644
index 0000000..dbd0764
--- /dev/null
+++ 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:camel="http://camel.apache.org/schema/spring";
+       xmlns:jdbc="http://www.springframework.org/schema/jdbc";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+           http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+           http://www.springframework.org/schema/jdbc 
http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
+           http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+       <!-- START SNIPPET: e1 -->
+       <jdbc:embedded-database id="dataSource" type="DERBY" />
+    <!-- END SNIPPET: e1 -->
+    
+    <!-- START SNIPPET: e2 -->
+    <bean id="messageIdRepository" 
class="org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository">
+       <constructor-arg ref="dataSource" />
+       <constructor-arg value="myProcessorName" />
+    </bean>
+    
+    <camel:camelContext>
+       <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" 
deadLetterUri="mock:error">
+               <camel:redeliveryPolicy maximumRedeliveries="0" 
maximumRedeliveryDelay="0" logStackTrace="false" />
+       </camel:errorHandler>
+       
+       <camel:route id="JdbcCachedMessageIdRepositoryTest" 
errorHandlerRef="deadLetterChannel">
+               <camel:from uri="direct:start" />
+               <camel:idempotentConsumer 
messageIdRepositoryRef="messageIdRepository">
+                       <camel:header>messageId</camel:header>
+                       <camel:to uri="mock:result" />
+               </camel:idempotentConsumer>
+       </camel:route>
+    </camel:camelContext>
+    <!-- END SNIPPET: e2 -->
+</beans>
\ No newline at end of file
diff --git a/docs/components/modules/ROOT/pages/sql-component.adoc 
b/docs/components/modules/ROOT/pages/sql-component.adoc
index 1512e26..aa2e4c4 100644
--- a/docs/components/modules/ROOT/pages/sql-component.adoc
+++ b/docs/components/modules/ROOT/pages/sql-component.adoc
@@ -595,6 +595,18 @@ This repository has two additional configuration parameters
 |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are 
done to createdAt Timestamp column.
 |===
 
+=== Caching Jdbc IdempotentRepository 
+
+Some SQL implementations are not fast on a per query basis.  The 
+`JdbcMessageIdRepository` implementation does its idempotent checks
+individually within SQL transactions.  Checking a mere 100 keys can 
+take minutes.  The `JdbcCachedMessageIdRepository` preloads an in-memory 
+cache on start with the entire list of keys.  This cache is then 
+checked first before passing through to the original implementation.
+
+As with all cache implementations, there are considerations that should
+be made with regard to stale data and your specific usage.
+
 == Using the JDBC based aggregation repository
 
 *Since Camel 2.6*

Reply via email to