Author: cmueller Date: Tue Apr 26 19:58:17 2011 New Revision: 1096880 URL: http://svn.apache.org/viewvc?rev=1096880&view=rev Log: CAMEL-3803: Component camel-jdbc does not support Transactions Thanks Heath Kesler for the patch
Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1096880&r1=1096879&r2=1096880&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java (original) +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java Tue Apr 26 19:58:17 2011 @@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultEndp */ public class JdbcEndpoint extends DefaultEndpoint { private int readSize; + private boolean transacted; private DataSource dataSource; private Map<String, Object> parameters; private boolean useJDBC4ColumnNameAndLabelSemantics = true; @@ -51,7 +52,7 @@ public class JdbcEndpoint extends Defaul } public Producer createProducer() throws Exception { - return new JdbcProducer(this, dataSource, readSize, parameters); + return new JdbcProducer(this, dataSource, readSize, parameters); } public int getReadSize() { @@ -62,6 +63,14 @@ public class JdbcEndpoint extends Defaul this.readSize = readSize; } + public boolean isTransacted() { + return transacted; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public DataSource getDataSource() { return dataSource; } Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1096880&r1=1096879&r2=1096880&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java (original) +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java Tue Apr 26 19:58:17 2011 @@ -62,10 +62,13 @@ public class JdbcProducer extends Defaul Connection conn = null; Statement stmt = null; ResultSet rs = null; + try { conn = dataSource.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); - + if (parameters != null && !parameters.isEmpty()) { IntrospectionSupport.setProperties(stmt, parameters); } @@ -81,26 +84,54 @@ public class JdbcProducer extends Defaul int updateCount = stmt.getUpdateCount(); exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount); } - } finally { - try { - if (rs != null) { - rs.close(); - } - if (stmt != null) { - stmt.close(); - } - if (conn != null) { - conn.close(); - } - } catch (SQLException e) { - LOG.warn("Error closing JDBC resource: " + e, e); + conn.commit(); + } catch (Exception e){ + try{ + conn.rollback(); + } catch (SQLException sqle){ + LOG.warn("Error on jdbc component rollback: " + sqle, sqle); } + throw e; + } finally { + closeQuietly(rs); + closeQuietly(stmt); + closeQuietly(conn); } // populate headers exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); } + private void closeQuietly(ResultSet rs) { + if (rs != null) { + try{ + rs.close(); + } catch (SQLException sqle){ + LOG.warn("Error by closing result set: " + sqle, sqle); + } + } + } + + private void closeQuietly(Statement stmt) { + if (stmt != null) { + try{ + stmt.close(); + } catch (SQLException sqle){ + LOG.warn("Error by closing statement: " + sqle, sqle); + } + } + } + + private void closeQuietly(Connection con) { + if (con != null) { + try{ + con.close(); + } catch (SQLException sqle){ + LOG.warn("Error by closing connection: " + sqle, sqle); + } + } + } + /** * Sets the result from the ResultSet to the Exchange as its OUT body. */ Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java (original) +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java Tue Apr 26 19:58:17 2011 @@ -21,6 +21,7 @@ import java.util.List; import javax.sql.DataSource; +import org.apache.camel.CamelExecutionException; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -39,6 +40,7 @@ public class JdbcOptionsTest extends Cam private String password = ""; private DataSource ds; + @SuppressWarnings("rawtypes") @Test public void testReadSize() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); @@ -52,6 +54,58 @@ public class JdbcOptionsTest extends Cam assertEquals(1, list.size()); } + @SuppressWarnings("rawtypes") + @Test + public void testInsertCommitO() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultTx"); + mock.expectedMessageCount(1); + // insert 2 recs into table + template.sendBody("direct:startTx", "insert into customer values ('cust3', 'johnsmith');insert into customer values ('cust4', 'hkesler') "); + + mock.assertIsSatisfied(); + + String body = mock.getExchanges().get(0).getIn().getBody(String.class); + assertNull(body); + + // now test to see that they were inserted and committed properly + MockEndpoint mockTest = getMockEndpoint("mock:retrieve"); + mockTest.expectedMessageCount(1); + + template.sendBody("direct:retrieve", "select * from customer"); + + mockTest.assertIsSatisfied(); + + List list = mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class); + // both records were committed + assertEquals(4, list.size()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testInsertRollback() throws Exception { + // insert 2 records + try{ + template.sendBody("direct:startTx", "insert into customer values ('cust3', 'johnsmith');insert into customer values ('cust3', 'hkesler')"); + fail("Should have thrown a CamelExecutionException"); + } catch (CamelExecutionException e) { + if (!e.getCause().getMessage().contains("Violation of unique constraint")) { + fail("Test did not throw the expected Constraint Violation Exception"); + } + } + + // check to see that they failed by getting a rec count from table + MockEndpoint mockTest = getMockEndpoint("mock:retrieve"); + mockTest.expectedMessageCount(1); + + template.sendBody("direct:retrieve", "select * from customer"); + + mockTest.assertIsSatisfied(); + + List list = mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class); + // all recs failed to insert + assertEquals(2, list.size()); + } + @Test public void testNoDataSourceInRegistry() throws Exception { try { @@ -73,6 +127,8 @@ public class JdbcOptionsTest extends Cam return new RouteBuilder() { public void configure() throws Exception { from("direct:start").to("jdbc:testdb?readSize=1").to("mock:result"); + from("direct:retrieve").to("jdbc:testdb").to("mock:retrieve"); + from("direct:startTx").to("jdbc:testdb?transacted=true").to("mock:resultTx"); } }; } @@ -84,7 +140,7 @@ public class JdbcOptionsTest extends Cam ds = dataSource; JdbcTemplate jdbc = new JdbcTemplate(ds); - jdbc.execute("create table customer (id varchar(15), name varchar(10))"); + jdbc.execute("create table customer (id varchar(15) PRIMARY KEY, name varchar(10))"); jdbc.execute("insert into customer values('cust1','jstrachan')"); jdbc.execute("insert into customer values('cust2','nsandhu')"); super.setUp(); Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java (original) +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java Tue Apr 26 19:58:17 2011 @@ -55,14 +55,15 @@ public class JdbcProducerConcurrenctTest doSendMessages(10, 5); } + @SuppressWarnings("rawtypes") private void doSendMessages(int files, int poolSize) throws Exception { getMockEndpoint("mock:result").expectedMessageCount(files); ExecutorService executor = Executors.newFixedThreadPool(poolSize); - Map<Integer, Future> responses = new ConcurrentHashMap<Integer, Future>(); + Map<Integer, Future<Object>> responses = new ConcurrentHashMap<Integer, Future<Object>>(); for (int i = 0; i < files; i++) { final int index = i; - Future out = executor.submit(new Callable<Object>() { + Future<Object> out = executor.submit(new Callable<Object>() { public Object call() throws Exception { int id = index % 2; return template.requestBody("direct:start", "select * from customer where id = " + id); Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java (original) +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java Tue Apr 26 19:58:17 2011 @@ -38,6 +38,7 @@ public class JdbcStatementParametersTest private String user = "sa"; private String password = ""; + @SuppressWarnings("rawtypes") @Test public void testMax2Rows() throws Exception { List rows = template.requestBody("direct:hello", "select * from customer order by id", List.class); @@ -46,6 +47,7 @@ public class JdbcStatementParametersTest assertEquals(2, context.getEndpoints().size()); } + @SuppressWarnings("rawtypes") @Test public void testMax5Rows() throws Exception { List rows = template.requestBody("jdbc:testdb?statement.maxRows=5&statement.fetchSize=50", "select * from customer order by id", List.class); @@ -54,6 +56,7 @@ public class JdbcStatementParametersTest assertEquals(3, context.getEndpoints().size()); } + @SuppressWarnings("rawtypes") @Test public void testNoParameters() throws Exception { List rows = template.requestBody("jdbc:testdb", "select * from customer order by id", List.class);