Repository: camel
Updated Branches:
  refs/heads/master 4683a8726 -> bc234678d


CAMEL-10459 - batch option implementation

simple implementation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d96e0d36
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d96e0d36
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d96e0d36

Branch: refs/heads/master
Commit: d96e0d364ed6d3c9976961960d4f93b8c1fce67f
Parents: 4683a87
Author: onders86 <ondersez...@gmail.com>
Authored: Fri Jan 13 17:39:40 2017 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jan 13 18:05:11 2017 +0100

----------------------------------------------------------------------
 .../camel/component/elsql/ElsqlEndpoint.java    |  38 ++---
 .../camel/component/elsql/ElsqlProducer.java    | 141 ++++++++++++-------
 .../component/elsql/ElSqlProducerBatchTest.java |  86 +++++++++++
 .../src/test/resources/elsql/projects.elsql     |   3 +
 4 files changed, 196 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index 62afbd0..f395f40 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.sql.DefaultSqlEndpoint;
+import org.apache.camel.component.sql.DefaultSqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlProcessingStrategy;
 import org.apache.camel.spi.Metadata;
@@ -53,11 +54,11 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(ElsqlEndpoint.class);
 
     private ElSql elSql;
-    private NamedParameterJdbcTemplate namedJdbcTemplate;
+    private final NamedParameterJdbcTemplate namedJdbcTemplate;
 
     @UriPath
     @Metadata(required = "true")
-    private String elsqlName;
+    private final String elsqlName;
     @UriPath
     private String resourceUri;
     @UriParam
@@ -67,8 +68,8 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     @UriParam(label = "advanced")
     private ElSqlConfig elSqlConfig;
 
-    public ElsqlEndpoint(String uri, Component component, 
NamedParameterJdbcTemplate namedJdbcTemplate, DataSource dataSource,
-                         String elsqlName, String resourceUri) {
+    public ElsqlEndpoint(final String uri, final Component component, final 
NamedParameterJdbcTemplate namedJdbcTemplate, final DataSource dataSource,
+                         final String elsqlName, final String resourceUri) {
         super(uri, component, null);
         this.elsqlName = elsqlName;
         this.resourceUri = resourceUri;
@@ -77,15 +78,15 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     }
 
     @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        SqlProcessingStrategy proStrategy = new 
ElsqlSqlProcessingStrategy(elSql);
-        SqlPrepareStatementStrategy preStategy = new 
ElsqlSqlPrepareStatementStrategy();
+    public Consumer createConsumer(final Processor processor) throws Exception 
{
+        final SqlProcessingStrategy proStrategy = new 
ElsqlSqlProcessingStrategy(elSql);
+        final SqlPrepareStatementStrategy preStategy = new 
ElsqlSqlPrepareStatementStrategy();
 
         final SqlParameterSource param = new EmptySqlParameterSource();
         final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param));
         LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql);
 
-        ElsqlConsumer consumer = new ElsqlConsumer(this, processor, 
namedJdbcTemplate, sql, param, preStategy, proStrategy);
+        final ElsqlConsumer consumer = new ElsqlConsumer(this, processor, 
namedJdbcTemplate, sql, param, preStategy, proStrategy);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
         consumer.setOnConsumeFailed(getOnConsumeFailed());
@@ -100,7 +101,8 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, 
namedJdbcTemplate, dataSource);
+        final SqlPrepareStatementStrategy prepareStrategy = 
getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new 
DefaultSqlPrepareStatementStrategy(getSeparator());
+        final ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, 
namedJdbcTemplate, dataSource, prepareStrategy, isBatch());
         return result;
     }
 
@@ -118,13 +120,13 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
         // there can be multiple resources
         // so we have all this lovely code to turn that into an URL[]
-        List<URL> list = new ArrayList<URL>();
-        Iterable it = ObjectHelper.createIterable(resourceUri);
-        for (Object path : it) {
-            URL url = 
ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext().getClassResolver(),
 path.toString());
+        final List<URL> list = new ArrayList<URL>();
+        final Iterable it = ObjectHelper.createIterable(resourceUri);
+        for (final Object path : it) {
+            final URL url = 
ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext().getClassResolver(),
 path.toString());
             list.add(url);
         }
-        URL[] urls = list.toArray(new URL[list.size()]);
+        final URL[] urls = list.toArray(new URL[list.size()]);
         elSql = ElSql.parse(elSqlConfig, urls);
     }
 
@@ -142,7 +144,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     /**
      * To use a vendor specific {@link com.opengamma.elsql.ElSqlConfig}
      */
-    public void setDatabaseVendor(ElSqlDatabaseVendor databaseVendor) {
+    public void setDatabaseVendor(final ElSqlDatabaseVendor databaseVendor) {
         this.databaseVendor = databaseVendor;
     }
 
@@ -153,7 +155,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     /**
      * To use a specific configured ElSqlConfig. It may be better to use the 
<tt>databaseVendor</tt> option instead.
      */
-    public void setElSqlConfig(ElSqlConfig elSqlConfig) {
+    public void setElSqlConfig(final ElSqlConfig elSqlConfig) {
         this.elSqlConfig = elSqlConfig;
     }
 
@@ -166,7 +168,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
      * The resources are loaded on the classpath by default, you can prefix 
with <tt>file:</tt> to load from file system.
      * Notice you can set this option on the component and then you do not 
have to configure this on the endpoint.
      */
-    public void setResourceUri(String resourceUri) {
+    public void setResourceUri(final String resourceUri) {
         this.resourceUri = resourceUri;
     }
 
@@ -177,7 +179,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     /**
      * Sets the DataSource to use to communicate with the database.
      */
-    public void setDataSource(DataSource dataSource) {
+    public void setDataSource(final DataSource dataSource) {
         this.dataSource = dataSource;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 2667566..4575844 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -20,6 +20,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Iterator;
 import java.util.List;
 import javax.sql.DataSource;
 
@@ -30,6 +31,7 @@ import org.apache.camel.component.sql.ResultSetIterator;
 import org.apache.camel.component.sql.ResultSetIteratorCompletion;
 import org.apache.camel.component.sql.SqlConstants;
 import org.apache.camel.component.sql.SqlOutputType;
+import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.impl.DefaultProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,13 +56,18 @@ public class ElsqlProducer extends DefaultProducer {
     private final String elSqlName;
     private final NamedParameterJdbcTemplate jdbcTemplate;
     private final DataSource dataSource;
+    private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+    private final boolean batch;
 
-    public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String 
elSqlName, NamedParameterJdbcTemplate jdbcTemplate, DataSource dataSource) {
+    public ElsqlProducer(final ElsqlEndpoint endpoint, final ElSql elSql, 
final String elSqlName, final NamedParameterJdbcTemplate jdbcTemplate, 
+                         final DataSource dataSource, final 
SqlPrepareStatementStrategy sqlPrepareStatementStrategy, final boolean batch) {
         super(endpoint);
         this.elSql = elSql;
         this.elSqlName = elSqlName;
         this.jdbcTemplate = jdbcTemplate;
         this.dataSource = dataSource;
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+        this.batch = batch;
     }
 
     @Override
@@ -70,14 +77,14 @@ public class ElsqlProducer extends DefaultProducer {
 
     @Override
     public void process(final Exchange exchange) throws Exception {
-        Object data = exchange.getIn().getBody();
+        final Object data = exchange.getIn().getBody();
 
         final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data);
         final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
         LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql);
 
         // special for processing stream list (batch not supported)
-        SqlOutputType outputType = getEndpoint().getOutputType();
+        final SqlOutputType outputType = getEndpoint().getOutputType();
         if (outputType == SqlOutputType.StreamList) {
             processStreamList(exchange, sql, param);
             return;
@@ -86,33 +93,45 @@ public class ElsqlProducer extends DefaultProducer {
         log.trace("jdbcTemplate.execute: {}", sql);
         jdbcTemplate.execute(sql, param, new 
PreparedStatementCallback<Object>() {
             @Override
-            public Object doInPreparedStatement(PreparedStatement ps) throws 
SQLException, DataAccessException {
+            public Object doInPreparedStatement(final PreparedStatement ps) 
throws SQLException, DataAccessException {
                 ResultSet rs = null;
                 try {
-                    boolean isResultSet = ps.execute();
-                    if (isResultSet) {
-                        rs = ps.getResultSet();
-
-                        // preserve headers first, so we can override the 
SQL_ROW_COUNT header
-                        
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-
-                        SqlOutputType outputType = 
getEndpoint().getOutputType();
-                        log.trace("Got result list from query: {}, 
outputType={}", rs, outputType);
-                        if (outputType == SqlOutputType.SelectList) {
-                            List<?> data = getEndpoint().queryForList(rs, 
true);
-                            // for noop=true we still want to enrich with the 
row count header
-                            if (getEndpoint().isNoop()) {
-                                
exchange.getOut().setBody(exchange.getIn().getBody());
-                            } else if (getEndpoint().getOutputHeader() != 
null) {
-                                
exchange.getOut().setBody(exchange.getIn().getBody());
-                                
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
-                            } else {
-                                exchange.getOut().setBody(data);
-                            }
-                            
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
-                        } else if (outputType == SqlOutputType.SelectOne) {
-                            Object data = getEndpoint().queryForObject(rs);
-                            if (data != null) {
+                    boolean isResultSet = false;
+                     
+                    final int expected = 
ps.getParameterMetaData().getParameterCount();
+                     
+                    if (expected > 0 && batch) {
+                        final String sqlForDefaultPreparedStamentStrategy =  
sql.replaceAll(":", ":\\?");
+                        final String preparedQuery = 
sqlPrepareStatementStrategy.prepareQuery(sqlForDefaultPreparedStamentStrategy, 
getEndpoint().isAllowNamedParameters(), exchange);
+                        final Iterator<?> iterator = 
exchange.getIn().getBody(Iterator.class);
+                        while (iterator != null && iterator.hasNext()) {
+                            final Object value = iterator.next();
+                            final Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sqlForDefaultPreparedStamentStrategy,
 preparedQuery, expected, exchange, value);
+                            sqlPrepareStatementStrategy.populateStatement(ps, 
i, expected);
+                            ps.addBatch();
+                        }
+                    }
+
+                    // execute the prepared statement and populate the 
outgoing message
+                    if (batch) {
+                        final int[] updateCounts = ps.executeBatch();
+                        int total = 0;
+                        for (final int count : updateCounts) {
+                            total += count;
+                        }
+                        
exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
+                    } else {
+                        isResultSet = ps.execute();
+                        if (isResultSet) {
+                            rs = ps.getResultSet();
+
+                            // preserve headers first, so we can override the 
SQL_ROW_COUNT header
+                            
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+
+                            final SqlOutputType outputType = 
getEndpoint().getOutputType();
+                            log.trace("Got result list from query: {}, 
outputType={}", rs, outputType);
+                            if (outputType == SqlOutputType.SelectList) {
+                                final List<?> data = 
getEndpoint().queryForList(rs, true);
                                 // for noop=true we still want to enrich with 
the row count header
                                 if (getEndpoint().isNoop()) {
                                     
exchange.getOut().setBody(exchange.getIn().getBody());
@@ -122,46 +141,60 @@ public class ElsqlProducer extends DefaultProducer {
                                 } else {
                                     exchange.getOut().setBody(data);
                                 }
-                                
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
-                            } else {
-                                if (getEndpoint().isNoop()) {
-                                    
exchange.getOut().setBody(exchange.getIn().getBody());
-                                } else if (getEndpoint().getOutputHeader() != 
null) {
-                                    
exchange.getOut().setBody(exchange.getIn().getBody());
+                                
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
+                            } else if (outputType == SqlOutputType.SelectOne) {
+                                final Object data = 
getEndpoint().queryForObject(rs);
+                                if (data != null) {
+                                    // for noop=true we still want to enrich 
with the row count header
+                                    if (getEndpoint().isNoop()) {
+                                        
exchange.getOut().setBody(exchange.getIn().getBody());
+                                    } else if (getEndpoint().getOutputHeader() 
!= null) {
+                                        
exchange.getOut().setBody(exchange.getIn().getBody());
+                                        
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
+                                    } else {
+                                        exchange.getOut().setBody(data);
+                                    }
+                                    
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
+                                } else {
+                                    if (getEndpoint().isNoop()) {
+                                        
exchange.getOut().setBody(exchange.getIn().getBody());
+                                    } else if (getEndpoint().getOutputHeader() 
!= null) {
+                                        
exchange.getOut().setBody(exchange.getIn().getBody());
+                                    }
+                                    
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0);
                                 }
-                                
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0);
+                            } else {
+                                throw new IllegalArgumentException("Invalid 
outputType=" + outputType);
                             }
                         } else {
-                            throw new IllegalArgumentException("Invalid 
outputType=" + outputType);
+                             // if we are here, there isResultSet is false. 
This can happen only if we are doing an update operation or there is no result.
+                             // we can simply add the updateCount in this case.
+                            
exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
                         }
-                    } else {
-                        // if we are here, there isResultSet is false. This 
can happen only if we are doing an update operation or there is no result.
-                        // we can simply add the updateCount in this case.
-                        
exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
                     }
-                } finally {
-                    closeResultSet(rs);
-                }
+                    } finally {
+                        closeResultSet(rs);
+                    }
 
                 return null;
             }
         });
     }
 
-    protected void processStreamList(Exchange exchange, String sql, 
SqlParameterSource param) throws Exception {
+    protected void processStreamList(final Exchange exchange, final String 
sql, final SqlParameterSource param) throws Exception {
         // spring JDBC to parse the SQL and build the prepared statement 
creator
         // this is what NamedJdbcTemplate does internally
-        ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql);
-        String sqlToUse = 
NamedParameterUtils.substituteNamedParameters(parsedSql, param);
-        Object[] params = NamedParameterUtils.buildValueArray(parsedSql, 
param, null);
-        List<SqlParameter> declaredParameters = 
NamedParameterUtils.buildSqlParameterList(parsedSql, param);
-        PreparedStatementCreatorFactory pscf = new 
PreparedStatementCreatorFactory(sqlToUse, declaredParameters);
-        PreparedStatementCreator statementCreator = 
pscf.newPreparedStatementCreator(params);
+        final ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql);
+        final String sqlToUse = 
NamedParameterUtils.substituteNamedParameters(parsedSql, param);
+        final Object[] params = NamedParameterUtils.buildValueArray(parsedSql, 
param, null);
+        final List<SqlParameter> declaredParameters = 
NamedParameterUtils.buildSqlParameterList(parsedSql, param);
+        final PreparedStatementCreatorFactory pscf = new 
PreparedStatementCreatorFactory(sqlToUse, declaredParameters);
+        final PreparedStatementCreator statementCreator = 
pscf.newPreparedStatementCreator(params);
 
         processStreamList(exchange, statementCreator, sqlToUse);
     }
 
-    protected void processStreamList(Exchange exchange, 
PreparedStatementCreator statementCreator, String preparedQuery) throws 
Exception {
+    protected void processStreamList(final Exchange exchange, final 
PreparedStatementCreator statementCreator, final String preparedQuery) throws 
Exception {
         log.trace("processStreamList: {}", preparedQuery);
 
         // do not use the jdbcTemplate as it will auto-close connection/ps/rs 
when exiting the execute method
@@ -174,10 +207,10 @@ public class ElsqlProducer extends DefaultProducer {
             con = dataSource.getConnection();
             ps = statementCreator.createPreparedStatement(con);
 
-            boolean isResultSet = ps.execute();
+            final boolean isResultSet = ps.execute();
             if (isResultSet) {
                 rs = ps.getResultSet();
-                ResultSetIterator iterator = 
getEndpoint().queryForStreamList(con, ps, rs);
+                final ResultSetIterator iterator = 
getEndpoint().queryForStreamList(con, ps, rs);
                 if (getEndpoint().isNoop()) {
                     exchange.getOut().setBody(exchange.getIn().getBody());
                 } else if (getEndpoint().getOutputHeader() != null) {
@@ -190,7 +223,7 @@ public class ElsqlProducer extends DefaultProducer {
                 // defer closing the iterator when the exchange is complete
                 exchange.addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             // in case of exception then close all this before rethrow
             closeConnection(con);
             closeStatement(ps);

http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java
 
b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java
new file mode 100644
index 0000000..4717217
--- /dev/null
+++ 
b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.SqlConstants;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+public class ElSqlProducerBatchTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        // this is the database we create with some initial data for our unit 
test
+        db = new EmbeddedDatabaseBuilder()
+                
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jndi.bind("dataSource", db);
+
+        return jndi;
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testBatchMode() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).header(SqlConstants.SQL_UPDATE_COUNT).isEqualTo(1);
+        
+        Map<String, Object> batchParams = new HashMap<>();       
+        batchParams.put("id", "4");
+        batchParams.put("license", "GNU");
+        batchParams.put("project", "Batch");
+        
+        template.sendBody("direct:batch", batchParams);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                
+                from("direct:batch")
+                        
.to("elsql:insertProject:elsql/projects.elsql?dataSource=#dataSource&batch=true")
+                        .to("mock:result");
+
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/test/resources/elsql/projects.elsql
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql 
b/components/camel-elsql/src/test/resources/elsql/projects.elsql
index 42fc6e9..995e272 100644
--- a/components/camel-elsql/src/test/resources/elsql/projects.elsql
+++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql
@@ -20,3 +20,6 @@
   UPDATE projects
   SET license = :lic
   WHERE id = :id
+@NAME(insertProject)
+  INSERT INTO projects (id, project, license)
+  VALUES (:id, :project, :license)
\ No newline at end of file

Reply via email to