CAMEL-9162: camel-elsql component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f65b0491
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f65b0491
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f65b0491

Branch: refs/heads/master
Commit: f65b0491f25a94089b547937d46ef5638580ee7a
Parents: 3dd8056
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Oct 5 10:35:04 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Oct 5 10:54:54 2015 +0200

----------------------------------------------------------------------
 .../camel/component/elsql/ElsqlComponent.java   |  24 ++---
 .../camel/component/elsql/ElsqlConsumer.java    |   7 +-
 .../camel/component/elsql/ElsqlEndpoint.java    |  16 ++-
 .../camel/component/elsql/ElsqlProducer.java    |   5 +-
 .../component/elsql/ElsqlSqlMapSource.java      |  49 ++++++---
 .../elsql/ElsqlSqlProcessingStrategy.java       |  51 ++++++++--
 .../elsql/ElSqlConsumerDeleteTest.java          | 100 +++++++++++++++++++
 .../src/test/resources/elsql/projects.elsql     |   4 +
 .../src/test/resources/log4j.properties         |   2 +-
 .../apache/camel/component/sql/SqlConsumer.java |  48 +++++++--
 .../sql/SqlNamedProcessingStrategy.java         |  57 +++++++++++
 11 files changed, 307 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
index 40e6530..51142e8 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
@@ -70,34 +70,23 @@ public class ElsqlComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("Invalid uri. Must by 
elsql:elsqlName/resourceUri, was: " + uri);
         }
 
-        /* TODO: add this later
         String onConsume = getAndRemoveParameter(parameters, 
"consumer.onConsume", String.class);
         if (onConsume == null) {
             onConsume = getAndRemoveParameter(parameters, "onConsume", 
String.class);
         }
-        if (onConsume != null && isUsePlaceholder()) {
-            onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, 
"?");
-        }
         String onConsumeFailed = getAndRemoveParameter(parameters, 
"consumer.onConsumeFailed", String.class);
         if (onConsumeFailed == null) {
             onConsumeFailed = getAndRemoveParameter(parameters, 
"onConsumeFailed", String.class);
         }
-        if (onConsumeFailed != null && isUsePlaceholder()) {
-            onConsumeFailed = 
onConsumeFailed.replaceAll(parameterPlaceholderSubstitute, "?");
-        }
         String onConsumeBatchComplete = getAndRemoveParameter(parameters, 
"consumer.onConsumeBatchComplete", String.class);
         if (onConsumeBatchComplete == null) {
             onConsumeBatchComplete = getAndRemoveParameter(parameters, 
"onConsumeBatchComplete", String.class);
         }
-        if (onConsumeBatchComplete != null && isUsePlaceholder()) {
-            onConsumeBatchComplete = 
onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
-        }
-         */
 
         ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, 
elsqlName, resUri);
-//        endpoint.setOnConsume(onConsume);
-//        endpoint.setOnConsumeFailed(onConsumeFailed);
-//        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
+        endpoint.setOnConsume(onConsume);
+        endpoint.setOnConsumeFailed(onConsumeFailed);
+        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
         endpoint.setDataSource(ds);
         endpoint.setDataSourceRef(dataSourceRef);
         endpoint.setElSqlConfig(elSqlConfig);
@@ -118,6 +107,13 @@ public class ElsqlComponent extends UriEndpointComponent {
         super.doStop();
     }
 
+    /**
+     * Sets the DataSource to use to communicate with the database.
+     */
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
     public DataSource getDataSource() {
         return dataSource;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
index 9459241..530dc23 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
@@ -21,12 +21,13 @@ import org.apache.camel.component.sql.DefaultSqlEndpoint;
 import org.apache.camel.component.sql.SqlConsumer;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlProcessingStrategy;
-import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 public class ElsqlConsumer extends SqlConsumer {
 
-    public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, 
JdbcTemplate jdbcTemplate, String query,
+    public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, 
NamedParameterJdbcTemplate namedJdbcTemplate, String query, SqlParameterSource 
parameterSource,
                          SqlPrepareStatementStrategy 
sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
-        super(endpoint, processor, jdbcTemplate, query, 
sqlPrepareStatementStrategy, sqlProcessingStrategy);
+        super(endpoint, processor, namedJdbcTemplate, query, parameterSource, 
sqlPrepareStatementStrategy, sqlProcessingStrategy);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index a07b93e..d2b2cbf 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -20,6 +20,7 @@ import java.net.URL;
 
 import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.ElSqlConfig;
+import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -33,12 +34,17 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ResourceHelper;
-import org.springframework.jdbc.core.JdbcTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 @UriEndpoint(scheme = "elsql", title = "SQL", syntax = 
"elsql:elsqlName:resourceUri", consumerClass = ElsqlConsumer.class, label = 
"database,sql")
 public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElsqlEndpoint.class);
+
     private volatile ElSql elSql;
     private NamedParameterJdbcTemplate namedJdbcTemplate;
 
@@ -59,12 +65,14 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        SqlProcessingStrategy proStrategy = new 
ElsqlSqlProcessingStrategy(elsqlName, elSql);
+        SqlProcessingStrategy proStrategy = new 
ElsqlSqlProcessingStrategy(elSql);
         SqlPrepareStatementStrategy preStategy = new 
ElsqlSqlPrepareStatementStrategy();
 
-        JdbcTemplate template = new JdbcTemplate(getDataSource());
+        final SqlParameterSource param = new EmptySqlParameterSource();
+        final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param));
+        LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql);
 
-        ElsqlConsumer consumer = new ElsqlConsumer(this, processor, template, 
elsqlName, preStategy, proStrategy);
+        ElsqlConsumer consumer = new ElsqlConsumer(this, processor, 
namedJdbcTemplate, sql, param, preStategy, proStrategy);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
         consumer.setOnConsumeFailed(getOnConsumeFailed());

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 4353d9a..78d0d2e 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -27,6 +27,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.SqlConstants;
 import org.apache.camel.component.sql.SqlOutputType;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@@ -36,6 +38,7 @@ import static 
org.springframework.jdbc.support.JdbcUtils.closeResultSet;
 
 public class ElsqlProducer extends DefaultProducer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElsqlProducer.class);
     private final ElSql elSql;
     private final String elSqlName;
     private final NamedParameterJdbcTemplate jdbcTemplate;
@@ -58,7 +61,7 @@ public class ElsqlProducer extends DefaultProducer {
 
         final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data);
         final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
-        log.debug("ElSql @{} using sql: {}", elSqlName, sql);
+        LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql);
 
         jdbcTemplate.execute(sql, param, new 
PreparedStatementCallback<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
index e8035b0..aec8d46 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
@@ -20,10 +20,16 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.AbstractSqlParameterSource;
 
-public class ElsqlSqlMapSource extends MapSqlParameterSource {
+/**
+ * A {@link org.springframework.jdbc.core.namedparam.SqlParameterSource} that 
is used by {@link com.opengamma.elsql.ElSql}
+ * to lookup parameter values. This source will lookup in the Camel {@link 
Exchange} and {@link org.apache.camel.Message}
+ * assuming they are Map based.
+ */
+public class ElsqlSqlMapSource extends AbstractSqlParameterSource {
 
+    // use the maps from the Camel Message as they are case insensitive which 
makes it easier for end users to work with
     private final Exchange exchange;
     private final Map<?, ?> bodyMap;
     private final Map<?, ?> headersMap;
@@ -32,23 +38,36 @@ public class ElsqlSqlMapSource extends 
MapSqlParameterSource {
         this.exchange = exchange;
         this.bodyMap = 
safeMap(exchange.getContext().getTypeConverter().tryConvertTo(Map.class, body));
         this.headersMap = safeMap(exchange.getIn().getHeaders());
-
-        addValue("body", body);
-
-        for (Map.Entry<?, ?> entry : bodyMap.entrySet()) {
-            String name = entry.getKey().toString();
-            Object value = entry.getValue();
-            addValue(name, value);
-        }
-        for (Map.Entry<?, ?> entry : headersMap.entrySet()) {
-            String name = entry.getKey().toString();
-            Object value = entry.getValue();
-            addValue(name, value);
-        }
     }
 
     private static Map<?, ?> safeMap(Map<?, ?> map) {
         return (map == null || map.isEmpty()) ? Collections.emptyMap() : map;
     }
 
+    @Override
+    public boolean hasValue(String paramName) {
+        if ("body".equals(paramName)) {
+            return true;
+        } else if (bodyMap.containsKey(paramName)) {
+            return true;
+        } else if (headersMap.containsKey(paramName)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Object getValue(String paramName) throws IllegalArgumentException {
+        Object answer;
+        if ("body".equals(paramName)) {
+            answer = exchange.getIn().getBody();
+        } else {
+            answer = bodyMap.get(paramName);
+            if (answer == null) {
+                headersMap.get(paramName);
+            }
+        }
+        return answer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
index ea933d8..4180edd 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
@@ -23,32 +23,56 @@ import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.DefaultSqlEndpoint;
-import org.apache.camel.component.sql.SqlProcessingStrategy;
+import org.apache.camel.component.sql.SqlNamedProcessingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
-public class ElsqlSqlProcessingStrategy implements SqlProcessingStrategy {
+public class ElsqlSqlProcessingStrategy implements SqlNamedProcessingStrategy {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElsqlSqlProcessingStrategy.class);
-    private final String elSqlName;
     private final ElSql elSql;
 
-    public ElsqlSqlProcessingStrategy(String elSqlName, ElSql elSql) {
-        this.elSqlName = elSqlName;
+    public ElsqlSqlProcessingStrategy(ElSql elSql) {
         this.elSql = elSql;
     }
 
     @Override
-    public int commit(final DefaultSqlEndpoint endpoint, final Exchange 
exchange, final Object data, final JdbcTemplate jdbcTemplate, final String 
query) throws Exception {
+    public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange 
exchange, Object data, NamedParameterJdbcTemplate jdbcTemplate,
+                      SqlParameterSource parameterSource, String query) throws 
Exception {
+
         final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data);
-        final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
-        LOG.debug("ElSql @{} using sql: {}", elSqlName, sql);
+        final String sql = elSql.getSql(query, new SpringSqlParams(param));
+        LOG.debug("commit @{} using sql: {}", query, sql);
+
+        return jdbcTemplate.execute(sql, param, new 
PreparedStatementCallback<Integer>() {
+            @Override
+            public Integer doInPreparedStatement(PreparedStatement ps) throws 
SQLException, DataAccessException {
+                ps.execute();
+
+                int updateCount = ps.getUpdateCount();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Update count {}", updateCount);
+                }
+                return updateCount;
+            }
+        });
+    }
+
+    @Override
+    public int commitBatchComplete(DefaultSqlEndpoint endpoint, 
NamedParameterJdbcTemplate namedJdbcTemplate,
+                            SqlParameterSource parameterSource, String query) 
throws Exception {
+
+        final SqlParameterSource param = new EmptySqlParameterSource();
+        final String sql = elSql.getSql(query, new SpringSqlParams(param));
+        LOG.debug("commitBatchComplete @{} using sql: {}", query, sql);
 
-        return jdbcTemplate.execute(sql, new 
PreparedStatementCallback<Integer>() {
+        return namedJdbcTemplate.execute(sql, param, new 
PreparedStatementCallback<Integer>() {
             @Override
             public Integer doInPreparedStatement(PreparedStatement ps) throws 
SQLException, DataAccessException {
                 ps.execute();
@@ -63,7 +87,12 @@ public class ElsqlSqlProcessingStrategy implements 
SqlProcessingStrategy {
     }
 
     @Override
-    public int commitBatchComplete(final DefaultSqlEndpoint endpoint, final 
JdbcTemplate jdbcTemplate, final String query) throws Exception {
-        return 0;
+    public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange 
exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws 
Exception {
+        throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public int commitBatchComplete(DefaultSqlEndpoint defaultSqlEndpoint, 
JdbcTemplate jdbcTemplate, String query) throws Exception {
+        throw new UnsupportedOperationException("Should not be called");
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
 
b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
new file mode 100644
index 0000000..a381bf5
--- /dev/null
+++ 
b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class ElSqlConsumerDeleteTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+    private JdbcTemplate jdbcTemplate;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+                
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jdbcTemplate = new JdbcTemplate(db);
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+        assertEquals(3, exchanges.size());
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", 
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", 
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", 
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // some servers may be a bit slow for this
+        for (int i = 0; i < 5; i++) {
+            // give it a little tine to delete
+            Thread.sleep(1000);
+            int rows = jdbcTemplate.queryForObject("select count(*) from 
projects", Integer.class);
+            if (rows == 0) {
+                break;
+            }
+        }
+        assertEquals("Should have deleted all 3 rows", new Integer(0), 
jdbcTemplate.queryForObject("select count(*) from projects", Integer.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getComponent("elsql", 
ElsqlComponent.class).setDataSource(db);
+
+                
from("elsql:allProjects:elsql/projects.elsql?consumer.onConsume=deleteProject")
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/elsql/projects.elsql
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql 
b/components/camel-elsql/src/test/resources/elsql/projects.elsql
index de60eef..1d957ef 100644
--- a/components/camel-elsql/src/test/resources/elsql/projects.elsql
+++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql
@@ -7,3 +7,7 @@
   SELECT *
   FROM projects
   ORDER BY id
+@NAME(deleteProject)
+  DELETE
+  FROM projects
+  WHERE id = :id

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/resources/log4j.properties 
b/components/camel-elsql/src/test/resources/log4j.properties
index d5af410..82e5ef4 100755
--- a/components/camel-elsql/src/test/resources/log4j.properties
+++ b/components/camel-elsql/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 # The logging properties used for testing
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
 
 #log4j.logger.org.apache.camel.component.sql=DEBUG
 #log4j.logger.org.apache.camel.component.sql=TRACE

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 1187881..0f52280 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -33,6 +33,8 @@ import org.apache.camel.util.ObjectHelper;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
 
@@ -40,6 +42,8 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
 
     private final String query;
     private final JdbcTemplate jdbcTemplate;
+    private final NamedParameterJdbcTemplate namedJdbcTemplate;
+    private final SqlParameterSource parameterSource;
     private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
     private final SqlProcessingStrategy sqlProcessingStrategy;
 
@@ -59,11 +63,23 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
         }
     }
 
-    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, 
JdbcTemplate jdbcTemplate, String query,
-                       SqlPrepareStatementStrategy 
sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
+    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, 
JdbcTemplate jdbcTemplate, String query, SqlPrepareStatementStrategy 
sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
         super(endpoint, processor);
         this.jdbcTemplate = jdbcTemplate;
+        this.namedJdbcTemplate = null;
+        this.query = query;
+        this.parameterSource = null;
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+        this.sqlProcessingStrategy = sqlProcessingStrategy;
+    }
+
+    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, 
NamedParameterJdbcTemplate namedJdbcTemplate, String query, SqlParameterSource 
parameterSource,
+                       SqlPrepareStatementStrategy 
sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
+        super(endpoint, processor);
+        this.jdbcTemplate = null;
+        this.namedJdbcTemplate = namedJdbcTemplate;
         this.query = query;
+        this.parameterSource = parameterSource;
         this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
         this.sqlProcessingStrategy = sqlProcessingStrategy;
     }
@@ -80,8 +96,7 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
         pendingExchanges = 0;
 
         final String preparedQuery = 
sqlPrepareStatementStrategy.prepareQuery(query, 
getEndpoint().isAllowNamedParameters());
-
-        Integer messagePolled = jdbcTemplate.execute(preparedQuery, new 
PreparedStatementCallback<Integer>() {
+        final PreparedStatementCallback<Integer> callback = new 
PreparedStatementCallback<Integer>() {
             @Override
             public Integer doInPreparedStatement(PreparedStatement 
preparedStatement) throws SQLException, DataAccessException {
                 Queue<DataHolder> answer = new LinkedList<DataHolder>();
@@ -114,7 +129,14 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
                     throw ObjectHelper.wrapRuntimeCamelException(e);
                 }
             }
-        });
+        };
+
+        Integer messagePolled;
+        if (namedJdbcTemplate != null) {
+            messagePolled = namedJdbcTemplate.execute(preparedQuery, 
parameterSource, callback);
+        } else {
+            messagePolled = jdbcTemplate.execute(preparedQuery, callback);
+        }
 
         return messagePolled;
     }
@@ -189,7 +211,13 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
             try {
                 // we can only run on consume if there was data
                 if (data != null && sql != null) {
-                    int updateCount = 
sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
+                    int updateCount;
+                    if (namedJdbcTemplate != null && sqlProcessingStrategy 
instanceof SqlNamedProcessingStrategy) {
+                        SqlNamedProcessingStrategy namedProcessingStrategy = 
(SqlNamedProcessingStrategy) sqlProcessingStrategy;
+                        updateCount = 
namedProcessingStrategy.commit(getEndpoint(), exchange, data, 
namedJdbcTemplate, parameterSource, sql);
+                    } else {
+                        updateCount = 
sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
+                    }
                     if (expectedUpdateCount > -1 && updateCount != 
expectedUpdateCount) {
                         String msg = "Expected update count " + 
expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
                         throw new SQLException(msg);
@@ -206,7 +234,13 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
 
         try {
             if (onConsumeBatchComplete != null) {
-                int updateCount = 
sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, 
onConsumeBatchComplete);
+                int updateCount;
+                if (namedJdbcTemplate != null && sqlProcessingStrategy 
instanceof SqlNamedProcessingStrategy) {
+                    SqlNamedProcessingStrategy namedProcessingStrategy = 
(SqlNamedProcessingStrategy) sqlProcessingStrategy;
+                    updateCount = 
namedProcessingStrategy.commitBatchComplete(getEndpoint(), namedJdbcTemplate, 
parameterSource, onConsumeBatchComplete);
+                } else {
+                    updateCount = 
sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, 
onConsumeBatchComplete);
+                }
                 log.debug("onConsumeBatchComplete update count {}", 
updateCount);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
new file mode 100644
index 0000000..cae9389
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+
+/**
+ * Extended processing strategy for dealing with SQL when consuming, which 
uses a {@link NamedParameterJdbcTemplate}
+ * instead of plain {@link org.springframework.jdbc.core.JdbcTemplate}.
+ */
+public interface SqlNamedProcessingStrategy extends SqlProcessingStrategy {
+
+    /**
+     * Commit callback if there are a query to be run after processing.
+     *
+     * @param endpoint          the endpoint
+     * @param exchange          The exchange after it has been processed
+     * @param data              The original data delivered to the route
+     * @param namedJdbcTemplate The JDBC template
+     * @param parameterSource   Parameter sources for the named JDBC template
+     * @param query             The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commit(DefaultSqlEndpoint endpoint, Exchange exchange, Object data,
+               NamedParameterJdbcTemplate namedJdbcTemplate, 
SqlParameterSource parameterSource, String query) throws Exception;
+
+    /**
+     * Commit callback when the batch is complete. This allows you to do one 
extra query after all rows has been processed in the batch.
+     *
+     * @param endpoint          the endpoint
+     * @param namedJdbcTemplate The JDBC template
+     * @param parameterSource   Parameter sources for the named JDBC template
+     * @param query             The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commitBatchComplete(DefaultSqlEndpoint endpoint, 
NamedParameterJdbcTemplate namedJdbcTemplate,
+                            SqlParameterSource parameterSource, String query) 
throws Exception;
+
+}

Reply via email to