This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push: new e4d71b424ca CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827) (#9860) e4d71b424ca is described below commit e4d71b424ca4b1b510d05df78527536ee767c034 Author: Zheng Feng <zh.f...@gmail.com> AuthorDate: Fri Apr 14 20:12:08 2023 +0800 CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827) (#9860) --- .../apache/camel/component/jdbc/JdbcProducer.java | 96 ++++++++++++++-------- 1 file changed, 63 insertions(+), 33 deletions(-) diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java index 6ad8aaec7d4..6ad50641720 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java @@ -32,7 +32,6 @@ import javax.sql.DataSource; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; -import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.SynchronizationAdapter; @@ -103,6 +102,21 @@ public class JdbcProducer extends DefaultProducer { if (shouldCloseResources) { resetAutoCommit(conn, autoCommit); closeQuietly(conn); + } else { + final Connection finalConn = conn; + final boolean finalAutoCommit = autoCommit; + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + resetAutoCommit(finalConn, finalAutoCommit); + closeQuietly(finalConn); + } + + @Override + public int getOrder() { + return LOWEST + 200; + } + }); } } } @@ -118,6 +132,19 @@ public class JdbcProducer extends DefaultProducer { } finally { if (shouldCloseResources && !connectionStrategy.isConnectionTransactional(conn, dataSource)) { closeQuietly(conn); + } else { + final Connection finalConn = conn; + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + closeQuietly(finalConn); + } + + @Override + public int getOrder() { + return LOWEST + 200; + } + }); } } } @@ -188,6 +215,22 @@ public class JdbcProducer extends DefaultProducer { if (shouldCloseResources) { closeQuietly(rs); closeQuietly(ps); + } else { + final Statement finalPs = ps; + final ResultSet finalRs = rs; + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + closeQuietly(finalRs); + closeQuietly(finalPs); + } + + @Override + public int getOrder() { + // Make sure it happens before close Connection. + return LOWEST + 100; + } + }); } } return shouldCloseResources; @@ -195,21 +238,12 @@ public class JdbcProducer extends DefaultProducer { private boolean doCreateAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { + Statement stmt = null; ResultSet rs = null; boolean shouldCloseResources = true; try { - // We might need to leave it open to allow post-processing of the result set. This is why we - // are not using try-with-resources here. - final Statement stmt = conn.createStatement(); - // ensure statement is closed (to not leak) when exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - closeQuietly(stmt); - } - }); - + stmt = conn.createStatement(); bindParameters(exchange, stmt); LOG.debug("Executing JDBC Statement: {}", sql); @@ -250,6 +284,23 @@ public class JdbcProducer extends DefaultProducer { } finally { if (shouldCloseResources) { closeQuietly(rs); + closeQuietly(stmt); + } else { + final Statement finalStmt = stmt; + final ResultSet finalRs = rs; + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + closeQuietly(finalRs); + closeQuietly(finalStmt); + } + + @Override + public int getOrder() { + // Make sure it happens before close Connection. + return LOWEST + 100; + } + }); } } return shouldCloseResources; @@ -346,7 +397,6 @@ public class JdbcProducer extends DefaultProducer { .setBody(new StreamListIterator( getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator)); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); // do not close resources as we are in streaming mode answer = false; } else if (outputType == JdbcOutputType.SelectList) { @@ -395,24 +445,4 @@ public class JdbcProducer extends DefaultProducer { return row; } } - - private static final class ResultSetIteratorCompletion implements Synchronization { - private final ResultSetIterator iterator; - - private ResultSetIteratorCompletion(ResultSetIterator iterator) { - this.iterator = iterator; - } - - @Override - public void onComplete(Exchange exchange) { - iterator.close(); - iterator.closeConnection(); - } - - @Override - public void onFailure(Exchange exchange) { - iterator.close(); - iterator.closeConnection(); - } - } }