This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new bf457b1 [CAMEL-16770] Added JDBC Idempotent Repository caching (#5767) bf457b1 is described below commit bf457b135b32a446dbabbd83146ce8127283582c Author: Michael Genereux <mgene...@gmail.com> AuthorDate: Mon Jul 5 09:29:30 2021 -0700 [CAMEL-16770] Added JDBC Idempotent Repository caching (#5767) * Added JDBC Idempotent Repository caching * Temp commit to see if CI build passes * Added documentation and warning about caching * Leave casting of count to JDBC implementation * Cleaned up lambda formatting --- .../apache/camel/catalog/docs/sql-component.adoc | 12 ++ .../camel-sql/src/main/docs/sql-component.adoc | 12 ++ .../jdbc/JdbcCachedMessageIdRepository.java | 142 +++++++++++++++++++++ .../jdbc/JdbcCachedMessageIdRepositoryTest.java | 85 ++++++++++++ .../processor/idempotent/jdbc/cached-spring.xml | 53 ++++++++ .../modules/ROOT/pages/sql-component.adoc | 12 ++ 6 files changed, 316 insertions(+) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc index 58bbb1e..a9eddc4 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc @@ -593,6 +593,18 @@ This repository has two additional configuration parameters |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. |=== +=== Caching Jdbc IdempotentRepository + +Some SQL implementations are not fast on a per query basis. The +`JdbcMessageIdRepository` implementation does its idempotent checks +individually within SQL transactions. Checking a mere 100 keys can +take minutes. The `JdbcCachedMessageIdRepository` preloads an in-memory +cache on start with the entire list of keys. This cache is then +checked first before passing through to the original implementation. + +As with all cache implementations, there are considerations that should +be made with regard to stale data and your specific usage. + == Using the JDBC based aggregation repository *Since Camel 2.6* diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc index 58bbb1e..a9eddc4 100644 --- a/components/camel-sql/src/main/docs/sql-component.adoc +++ b/components/camel-sql/src/main/docs/sql-component.adoc @@ -593,6 +593,18 @@ This repository has two additional configuration parameters |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. |=== +=== Caching Jdbc IdempotentRepository + +Some SQL implementations are not fast on a per query basis. The +`JdbcMessageIdRepository` implementation does its idempotent checks +individually within SQL transactions. Checking a mere 100 keys can +take minutes. The `JdbcCachedMessageIdRepository` preloads an in-memory +cache on start with the entire list of keys. This cache is then +checked first before passing through to the original implementation. + +As with all cache implementations, there are considerations that should +be made with regard to stale data and your specific usage. + == Using the JDBC based aggregation repository *Since Camel 2.6* diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java new file mode 100644 index 0000000..d720cb9 --- /dev/null +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepository.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import java.util.HashMap; +import java.util.Map; + +import javax.sql.DataSource; + +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * Caching version of {@link JdbcMessageIdRepository} + */ +public class JdbcCachedMessageIdRepository extends JdbcMessageIdRepository { + private Map<String, Integer> cache = new HashMap<>(); + private int hitCount; + private int missCount; + private String queryAllString + = "SELECT messageId, COUNT(*) AS messageCount FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? GROUP BY messageId"; + + public JdbcCachedMessageIdRepository() { + } + + public JdbcCachedMessageIdRepository(DataSource dataSource, String processorName) { + super(dataSource, processorName); + } + + public JdbcCachedMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) { + super(dataSource, transactionTemplate, processorName); + } + + public JdbcCachedMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) { + super(jdbcTemplate, transactionTemplate); + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + if (getTableName() != null) { + queryAllString = queryAllString.replaceFirst(DEFAULT_TABLENAME, getTableName()); + } + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + reload(); + } + + @Override + public boolean add(final String key) { + Integer previousValue = cache.getOrDefault(key, 0); + cache.put(key, previousValue + 1); + if (previousValue != 0) { + hitCount++; + return false; + } + missCount++; + return super.add(key); + } + + @Override + public boolean contains(final String key) { + Integer previousValue = cache.getOrDefault(key, 0); + if (previousValue != 0) { + hitCount++; + return true; + } + missCount++; + return super.contains(key); + } + + @Override + public boolean remove(String key) { + cache.remove(key); + return super.remove(key); + } + + @Override + public void clear() { + cache.clear(); + hitCount = 0; + missCount = 0; + super.clear(); + } + + public String getQueryAllString() { + return queryAllString; + } + + public void setQueryAllString(String queryAllString) { + this.queryAllString = queryAllString; + } + + public int getHitCount() { + return hitCount; + } + + public int getMissCount() { + return missCount; + } + + public void reload() { + transactionTemplate.execute(status -> { + try { + cache = jdbcTemplate.query(getQueryAllString(), resultSet -> { + Map<String, Integer> messageIdCount = new HashMap<>(); + while (resultSet.next()) { + messageIdCount.put(resultSet.getString("messageId"), resultSet.getInt("messageCount")); + } + return messageIdCount; + }, getProcessorName()); + log.info("JdbcCachedMessageIdRepository cache loaded with {} entries", cache.size()); + } catch (DataAccessException dae) { + log.error( + "Unable to populate JdbcCachedMessageIdRepository cache because of: {}.", + dae.getMessage()); + throw dae; + } + return Boolean.TRUE; + + }); + } + +} diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java new file mode 100644 index 0000000..77ef966 --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcCachedMessageIdRepositoryTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import java.sql.Timestamp; + +import javax.sql.DataSource; + +import org.apache.camel.EndpointInject; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.junit5.CamelSpringTestSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.jdbc.core.JdbcTemplate; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JdbcCachedMessageIdRepositoryTest extends CamelSpringTestSupport { + + protected static final String INSERT_STRING + = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; + protected static final String PROCESSOR_NAME = "myProcessorName"; + + protected JdbcTemplate jdbcTemplate; + protected DataSource dataSource; + protected JdbcCachedMessageIdRepository repository; + + @EndpointInject("mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject("mock:error") + protected MockEndpoint errorEndpoint; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + + dataSource = context.getRegistry().lookupByNameAndType("dataSource", DataSource.class); + jdbcTemplate = new JdbcTemplate(dataSource); + jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "1", new Timestamp(System.currentTimeMillis())); + jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "2", new Timestamp(System.currentTimeMillis())); + repository = context.getRegistry().lookupByNameAndType("messageIdRepository", JdbcCachedMessageIdRepository.class); + repository.reload(); + } + + @Test + public void testCacheHit() throws Exception { + resultEndpoint.expectedBodiesReceived("three"); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + + assertMockEndpointsSatisfied(); + + assertEquals(5, repository.getHitCount()); + assertEquals(1, repository.getMissCount()); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/cached-spring.xml"); + } +} diff --git a/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml b/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml new file mode 100644 index 0000000..dbd0764 --- /dev/null +++ b/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/cached-spring.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:camel="http://camel.apache.org/schema/spring" + xmlns:jdbc="http://www.springframework.org/schema/jdbc" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- START SNIPPET: e1 --> + <jdbc:embedded-database id="dataSource" type="DERBY" /> + <!-- END SNIPPET: e1 --> + + <!-- START SNIPPET: e2 --> + <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository"> + <constructor-arg ref="dataSource" /> + <constructor-arg value="myProcessorName" /> + </bean> + + <camel:camelContext> + <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> + <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> + </camel:errorHandler> + + <camel:route id="JdbcCachedMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> + <camel:from uri="direct:start" /> + <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> + <camel:header>messageId</camel:header> + <camel:to uri="mock:result" /> + </camel:idempotentConsumer> + </camel:route> + </camel:camelContext> + <!-- END SNIPPET: e2 --> +</beans> \ No newline at end of file diff --git a/docs/components/modules/ROOT/pages/sql-component.adoc b/docs/components/modules/ROOT/pages/sql-component.adoc index 1512e26..aa2e4c4 100644 --- a/docs/components/modules/ROOT/pages/sql-component.adoc +++ b/docs/components/modules/ROOT/pages/sql-component.adoc @@ -595,6 +595,18 @@ This repository has two additional configuration parameters |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. |=== +=== Caching Jdbc IdempotentRepository + +Some SQL implementations are not fast on a per query basis. The +`JdbcMessageIdRepository` implementation does its idempotent checks +individually within SQL transactions. Checking a mere 100 keys can +take minutes. The `JdbcCachedMessageIdRepository` preloads an in-memory +cache on start with the entire list of keys. This cache is then +checked first before passing through to the original implementation. + +As with all cache implementations, there are considerations that should +be made with regard to stale data and your specific usage. + == Using the JDBC based aggregation repository *Since Camel 2.6*