Author: cmueller
Date: Tue Apr 26 19:58:17 2011
New Revision: 1096880

URL: http://svn.apache.org/viewvc?rev=1096880&view=rev
Log:
CAMEL-3803: Component camel-jdbc does not support Transactions
Thanks Heath Kesler for the patch

Modified:
    
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
    
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
    
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
    
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
    
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java

Modified: 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
 Tue Apr 26 19:58:17 2011
@@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultEndp
  */
 public class JdbcEndpoint extends DefaultEndpoint {
     private int readSize;
+    private boolean transacted;
     private DataSource dataSource;
     private Map<String, Object> parameters;
     private boolean useJDBC4ColumnNameAndLabelSemantics = true;
@@ -51,7 +52,7 @@ public class JdbcEndpoint extends Defaul
     }
 
     public Producer createProducer() throws Exception {
-        return new JdbcProducer(this, dataSource, readSize, parameters);
+        return new JdbcProducer(this, dataSource, readSize,  parameters);
     }
 
     public int getReadSize() {
@@ -62,6 +63,14 @@ public class JdbcEndpoint extends Defaul
         this.readSize = readSize;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
     public DataSource getDataSource() {
         return dataSource;
     }

Modified: 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
 (original)
+++ 
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
 Tue Apr 26 19:58:17 2011
@@ -62,10 +62,13 @@ public class JdbcProducer extends Defaul
         Connection conn = null;
         Statement stmt = null;
         ResultSet rs = null;
+        
         try {
             conn = dataSource.getConnection();
+            conn.setAutoCommit(false);
+            
             stmt = conn.createStatement();
-
+            
             if (parameters != null && !parameters.isEmpty()) {
                 IntrospectionSupport.setProperties(stmt, parameters);
             }
@@ -81,26 +84,54 @@ public class JdbcProducer extends Defaul
                 int updateCount = stmt.getUpdateCount();
                 exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, 
updateCount);
             }
-        } finally {
-            try {
-                if (rs != null) {
-                    rs.close();
-                }
-                if (stmt != null) {
-                    stmt.close();
-                }
-                if (conn != null) {
-                    conn.close();
-                }
-            } catch (SQLException e) {
-                LOG.warn("Error closing JDBC resource: " + e, e);
+            conn.commit();
+        } catch (Exception e){
+            try{
+                conn.rollback();
+            } catch (SQLException sqle){
+                LOG.warn("Error on jdbc component rollback: " + sqle, sqle);
             }
+            throw e;
+        } finally {
+            closeQuietly(rs);
+            closeQuietly(stmt);
+            closeQuietly(conn);
         }
 
         // populate headers
         exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
     }
 
+    private void closeQuietly(ResultSet rs) {
+        if (rs != null) {
+            try{
+                rs.close();
+            } catch (SQLException sqle){
+                LOG.warn("Error by closing result set: " + sqle, sqle);
+            }
+        }
+    }
+    
+    private void closeQuietly(Statement stmt) {
+        if (stmt != null) {
+            try{
+                stmt.close();
+            } catch (SQLException sqle){
+                LOG.warn("Error by closing statement: " + sqle, sqle);
+            }
+        }
+    }
+    
+    private void closeQuietly(Connection con) {
+        if (con != null) {
+            try{
+                con.close();
+            } catch (SQLException sqle){
+                LOG.warn("Error by closing connection: " + sqle, sqle);
+            }
+        }
+    }
+
     /**
      * Sets the result from the ResultSet to the Exchange as its OUT body.
      */

Modified: 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
 (original)
+++ 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
 Tue Apr 26 19:58:17 2011
@@ -21,6 +21,7 @@ import java.util.List;
 
 import javax.sql.DataSource;
 
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -39,6 +40,7 @@ public class JdbcOptionsTest extends Cam
     private String password = "";
     private DataSource ds;
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testReadSize() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -52,6 +54,58 @@ public class JdbcOptionsTest extends Cam
         assertEquals(1, list.size());
     }
 
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testInsertCommitO() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultTx");
+        mock.expectedMessageCount(1);
+        // insert 2 recs into table
+        template.sendBody("direct:startTx", "insert into customer values 
('cust3', 'johnsmith');insert into customer values ('cust4', 'hkesler') ");
+
+        mock.assertIsSatisfied();
+
+        String body = mock.getExchanges().get(0).getIn().getBody(String.class);
+        assertNull(body);
+
+        // now test to see that they were inserted and committed properly
+        MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
+        mockTest.expectedMessageCount(1);
+
+        template.sendBody("direct:retrieve", "select * from customer");
+
+        mockTest.assertIsSatisfied();
+
+        List list = 
mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
+        // both records were committed
+        assertEquals(4, list.size());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testInsertRollback() throws Exception {
+        // insert 2 records
+        try{
+            template.sendBody("direct:startTx", "insert into customer values 
('cust3', 'johnsmith');insert into customer values ('cust3', 'hkesler')");
+            fail("Should have thrown a CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            if (!e.getCause().getMessage().contains("Violation of unique 
constraint")) {
+                fail("Test did not throw the expected Constraint Violation 
Exception");
+            }
+        }
+
+        // check to see that they failed by getting a rec count from table
+        MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
+        mockTest.expectedMessageCount(1);
+
+        template.sendBody("direct:retrieve", "select * from customer");
+
+        mockTest.assertIsSatisfied();
+
+        List list = 
mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
+        // all recs failed to insert
+        assertEquals(2, list.size());
+    }
+
     @Test
     public void testNoDataSourceInRegistry() throws Exception {
         try {
@@ -73,6 +127,8 @@ public class JdbcOptionsTest extends Cam
         return new RouteBuilder() {
             public void configure() throws Exception {
                 
from("direct:start").to("jdbc:testdb?readSize=1").to("mock:result");
+                from("direct:retrieve").to("jdbc:testdb").to("mock:retrieve");
+                
from("direct:startTx").to("jdbc:testdb?transacted=true").to("mock:resultTx");
             }
         };
     }
@@ -84,7 +140,7 @@ public class JdbcOptionsTest extends Cam
         ds = dataSource;
 
         JdbcTemplate jdbc = new JdbcTemplate(ds);
-        jdbc.execute("create table customer (id varchar(15), name 
varchar(10))");
+        jdbc.execute("create table customer (id varchar(15) PRIMARY KEY, name 
varchar(10))");
         jdbc.execute("insert into customer values('cust1','jstrachan')");
         jdbc.execute("insert into customer values('cust2','nsandhu')");
         super.setUp();

Modified: 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
 (original)
+++ 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
 Tue Apr 26 19:58:17 2011
@@ -55,14 +55,15 @@ public class JdbcProducerConcurrenctTest
         doSendMessages(10, 5);
     }
 
+    @SuppressWarnings("rawtypes")
     private void doSendMessages(int files, int poolSize) throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(files);
 
         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
-        Map<Integer, Future> responses = new ConcurrentHashMap<Integer, 
Future>();
+        Map<Integer, Future<Object>> responses = new 
ConcurrentHashMap<Integer, Future<Object>>();
         for (int i = 0; i < files; i++) {
             final int index = i;
-            Future out = executor.submit(new Callable<Object>() {
+            Future<Object> out = executor.submit(new Callable<Object>() {
                 public Object call() throws Exception {
                     int id = index % 2;
                     return template.requestBody("direct:start", "select * from 
customer where id = " + id);

Modified: 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
 (original)
+++ 
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
 Tue Apr 26 19:58:17 2011
@@ -38,6 +38,7 @@ public class JdbcStatementParametersTest
     private String user = "sa";
     private String password = "";
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testMax2Rows() throws Exception {
         List rows = template.requestBody("direct:hello", "select * from 
customer order by id", List.class);
@@ -46,6 +47,7 @@ public class JdbcStatementParametersTest
         assertEquals(2, context.getEndpoints().size());
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testMax5Rows() throws Exception {
         List rows = 
template.requestBody("jdbc:testdb?statement.maxRows=5&statement.fetchSize=50", 
"select * from customer order by id", List.class);
@@ -54,6 +56,7 @@ public class JdbcStatementParametersTest
         assertEquals(3, context.getEndpoints().size());
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testNoParameters() throws Exception {
         List rows = template.requestBody("jdbc:testdb", "select * from 
customer order by id", List.class);


Reply via email to