This is an automated email from the ASF dual-hosted git repository. ggrzybek pushed a commit to branch camel-2.23.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.23.x by this push: new f14dbb5 [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior f14dbb5 is described below commit f14dbb500b4ef0bdb84448374d868eacd6408524 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> AuthorDate: Mon Sep 9 14:58:20 2019 +0200 [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior (cherry picked from commit 6974f9b60a504eb967b5e643254c441040df7f9c) (cherry picked from commit 6f2348cf09a14ba4863e2364289a151da865fbcd) (cherry picked from commit ab9746c1d3b0a2101f0d2d098212d26aacabb17d) --- .../camel-sql/src/main/docs/sql-component.adoc | 48 +++++++++++- .../aggregate/jdbc/JdbcAggregationRepository.java | 19 +++-- .../jdbc/PostgresAggregationRepository.java | 91 ++++++++++++++++++++++ 3 files changed, 151 insertions(+), 7 deletions(-) diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc index 4a5c590..80ed04f 100644 --- a/components/camel-sql/src/main/docs/sql-component.adoc +++ b/components/camel-sql/src/main/docs/sql-component.adoc @@ -765,9 +765,9 @@ JDBC vendor. <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> - <propertyname="repositoryName" value="aggregation"/> + <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> - <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> + <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extraFQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> @@ -780,6 +780,50 @@ class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> </bean> ----- +=== Propagation behavior + +`JdbcAggregationRepository` uses two distinct _transaction templates_ from Spring-TX. One is read-only +and one is used for read-write operations. + +However, when using `JdbcAggregationRepository` within a route that itself uses `<transacted />` and there's +common `PlatformTransactionManager` used, there may be a need to configure _propagation behavior_ used by +transaction templates inside `JdbcAggregationRepository`. + +Here's a way to do it: +[source,xml] +---- +<bean id="repo" +class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> + <property name="propagationBehaviorName" value="PROPAGATION_NESTED" /> +</bean> +---- + +Propagation is specified by constants of `org.springframework.transaction.TransactionDefinition` interface, +so `propagationBehaviorName` is convenient setter that allows to use names of the constants. + +=== PostgreSQL case + +There's special database that may cause problems with optimistic locking used by `JdbcAggregationRepository`. +PostgreSQL marks connection as invalid in case of data integrity violation exception (the one with SQLState 23505). +This makes the connection effectively unusable within nested transaction. +Details can be found +https://www.postgresql.org/message-id/200609241203.59292.ralf.wiebicke%40exedio.com[in this document]. + +`org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository` extends `JdbcAggregationRepository` and +uses special `INSERT .. ON CONFLICT ..` statement to provide optimistic locking behavior. + +This statement is (with default aggregation table definition): +[source,sql] +---- +INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING +---- + +Details can be found https://www.postgresql.org/docs/9.5/sql-insert.html[in PostgreSQL documentation]. + +When this clause is used, `java.sql.PreparedStatement.executeUpdate()` call returns `0` instead of throwing +SQLException with SQLState=23505. Further handling is exactly the same as with generic `JdbcAggregationRepository`, +but without marking PostgreSQL connection as invalid. + === Camel Sql Starter A starter module is available to spring-boot users. When using the starter, diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java index 597767a..4e93389 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java @@ -59,11 +59,11 @@ import org.springframework.transaction.support.TransactionTemplate; */ public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + protected static final String EXCHANGE = "exchange"; + protected static final String ID = "id"; + protected static final String BODY = "body"; private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class); - private static final String ID = "id"; - private static final String EXCHANGE = "exchange"; - private static final String BODY = "body"; private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class); private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper(); @@ -240,9 +240,9 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true); } - protected void insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception { + protected int insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception { final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders); - jdbcTemplate.execute(sql, + Integer updateCount = jdbcTemplate.execute(sql, new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) { @Override protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException { @@ -265,6 +265,7 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover } } }); + return updateCount == null ? 0 : updateCount; } @Override @@ -443,6 +444,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty(); } + public List<String> getHeadersToStoreAsText() { + return headersToStoreAsText; + } + /** * Allows to store headers as String which is human readable. By default this option is disabled, * storing the headers in binary format. @@ -453,6 +458,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover this.headersToStoreAsText = headersToStoreAsText; } + public boolean isStoreBodyAsText() { + return storeBodyAsText; + } + /** * Whether to store the message body as String which is human readable. * By default this option is false storing the body in binary format. diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java new file mode 100644 index 0000000..f023432 --- /dev/null +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.jdbc; + +import javax.sql.DataSource; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions + * using special {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues. + */ +public class PostgresAggregationRepository extends JdbcAggregationRepository { + + /** + * Creates an aggregation repository + */ + public PostgresAggregationRepository() { + } + + /** + * Creates an aggregation repository with the three mandatory parameters + */ + public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) { + super(transactionManager, repositoryName, dataSource); + } + + /** + * Inserts a new record into the given repository table + * + * @param camelContext the current CamelContext + * @param correlationId the correlation key + * @param exchange the aggregated exchange + * @param repositoryName The name of the table + */ + protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception { + // The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased + int totalParameterIndex = 2; + StringBuilder queryBuilder = new StringBuilder() + .append("INSERT INTO ").append(repositoryName) + .append('(') + .append(EXCHANGE).append(", ") + .append(ID); + + if (isStoreBodyAsText()) { + queryBuilder.append(", ").append(BODY); + totalParameterIndex++; + } + + if (hasHeadersToStoreAsText()) { + for (String headerName : getHeadersToStoreAsText()) { + queryBuilder.append(", ").append(headerName); + totalParameterIndex++; + } + } + + queryBuilder.append(") VALUES ("); + + for (int i = 0; i < totalParameterIndex - 1; i++) { + queryBuilder.append("?, "); + } + queryBuilder.append("?)"); + + queryBuilder.append(" ON CONFLICT DO NOTHING"); + + String sql = queryBuilder.toString(); + + int updateCount = insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true); + if (updateCount == 0 && getRepositoryName().equals(repositoryName)) { + throw new DataIntegrityViolationException("No row was inserted due to data violation"); + } + } + +}