Repository: camel Updated Branches: refs/heads/master 4683a8726 -> bc234678d
CAMEL-10459 - batch option implementation simple implementation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d96e0d36 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d96e0d36 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d96e0d36 Branch: refs/heads/master Commit: d96e0d364ed6d3c9976961960d4f93b8c1fce67f Parents: 4683a87 Author: onders86 <ondersez...@gmail.com> Authored: Fri Jan 13 17:39:40 2017 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 13 18:05:11 2017 +0100 ---------------------------------------------------------------------- .../camel/component/elsql/ElsqlEndpoint.java | 38 ++--- .../camel/component/elsql/ElsqlProducer.java | 141 ++++++++++++------- .../component/elsql/ElSqlProducerBatchTest.java | 86 +++++++++++ .../src/test/resources/elsql/projects.elsql | 3 + 4 files changed, 196 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java index 62afbd0..f395f40 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java @@ -29,6 +29,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.sql.DefaultSqlEndpoint; +import org.apache.camel.component.sql.DefaultSqlPrepareStatementStrategy; import org.apache.camel.component.sql.SqlPrepareStatementStrategy; import org.apache.camel.component.sql.SqlProcessingStrategy; import org.apache.camel.spi.Metadata; @@ -53,11 +54,11 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { private static final Logger LOG = LoggerFactory.getLogger(ElsqlEndpoint.class); private ElSql elSql; - private NamedParameterJdbcTemplate namedJdbcTemplate; + private final NamedParameterJdbcTemplate namedJdbcTemplate; @UriPath @Metadata(required = "true") - private String elsqlName; + private final String elsqlName; @UriPath private String resourceUri; @UriParam @@ -67,8 +68,8 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { @UriParam(label = "advanced") private ElSqlConfig elSqlConfig; - public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, DataSource dataSource, - String elsqlName, String resourceUri) { + public ElsqlEndpoint(final String uri, final Component component, final NamedParameterJdbcTemplate namedJdbcTemplate, final DataSource dataSource, + final String elsqlName, final String resourceUri) { super(uri, component, null); this.elsqlName = elsqlName; this.resourceUri = resourceUri; @@ -77,15 +78,15 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { } @Override - public Consumer createConsumer(Processor processor) throws Exception { - SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elSql); - SqlPrepareStatementStrategy preStategy = new ElsqlSqlPrepareStatementStrategy(); + public Consumer createConsumer(final Processor processor) throws Exception { + final SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elSql); + final SqlPrepareStatementStrategy preStategy = new ElsqlSqlPrepareStatementStrategy(); final SqlParameterSource param = new EmptySqlParameterSource(); final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param)); LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql); - ElsqlConsumer consumer = new ElsqlConsumer(this, processor, namedJdbcTemplate, sql, param, preStategy, proStrategy); + final ElsqlConsumer consumer = new ElsqlConsumer(this, processor, namedJdbcTemplate, sql, param, preStategy, proStrategy); consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); consumer.setOnConsume(getOnConsume()); consumer.setOnConsumeFailed(getOnConsumeFailed()); @@ -100,7 +101,8 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { @Override public Producer createProducer() throws Exception { - ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate, dataSource); + final SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator()); + final ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate, dataSource, prepareStrategy, isBatch()); return result; } @@ -118,13 +120,13 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { // there can be multiple resources // so we have all this lovely code to turn that into an URL[] - List<URL> list = new ArrayList<URL>(); - Iterable it = ObjectHelper.createIterable(resourceUri); - for (Object path : it) { - URL url = ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext().getClassResolver(), path.toString()); + final List<URL> list = new ArrayList<URL>(); + final Iterable it = ObjectHelper.createIterable(resourceUri); + for (final Object path : it) { + final URL url = ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext().getClassResolver(), path.toString()); list.add(url); } - URL[] urls = list.toArray(new URL[list.size()]); + final URL[] urls = list.toArray(new URL[list.size()]); elSql = ElSql.parse(elSqlConfig, urls); } @@ -142,7 +144,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { /** * To use a vendor specific {@link com.opengamma.elsql.ElSqlConfig} */ - public void setDatabaseVendor(ElSqlDatabaseVendor databaseVendor) { + public void setDatabaseVendor(final ElSqlDatabaseVendor databaseVendor) { this.databaseVendor = databaseVendor; } @@ -153,7 +155,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { /** * To use a specific configured ElSqlConfig. It may be better to use the <tt>databaseVendor</tt> option instead. */ - public void setElSqlConfig(ElSqlConfig elSqlConfig) { + public void setElSqlConfig(final ElSqlConfig elSqlConfig) { this.elSqlConfig = elSqlConfig; } @@ -166,7 +168,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { * The resources are loaded on the classpath by default, you can prefix with <tt>file:</tt> to load from file system. * Notice you can set this option on the component and then you do not have to configure this on the endpoint. */ - public void setResourceUri(String resourceUri) { + public void setResourceUri(final String resourceUri) { this.resourceUri = resourceUri; } @@ -177,7 +179,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { /** * Sets the DataSource to use to communicate with the database. */ - public void setDataSource(DataSource dataSource) { + public void setDataSource(final DataSource dataSource) { this.dataSource = dataSource; } } http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java index 2667566..4575844 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java @@ -20,6 +20,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Iterator; import java.util.List; import javax.sql.DataSource; @@ -30,6 +31,7 @@ import org.apache.camel.component.sql.ResultSetIterator; import org.apache.camel.component.sql.ResultSetIteratorCompletion; import org.apache.camel.component.sql.SqlConstants; import org.apache.camel.component.sql.SqlOutputType; +import org.apache.camel.component.sql.SqlPrepareStatementStrategy; import org.apache.camel.impl.DefaultProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,13 +56,18 @@ public class ElsqlProducer extends DefaultProducer { private final String elSqlName; private final NamedParameterJdbcTemplate jdbcTemplate; private final DataSource dataSource; + private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy; + private final boolean batch; - public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate, DataSource dataSource) { + public ElsqlProducer(final ElsqlEndpoint endpoint, final ElSql elSql, final String elSqlName, final NamedParameterJdbcTemplate jdbcTemplate, + final DataSource dataSource, final SqlPrepareStatementStrategy sqlPrepareStatementStrategy, final boolean batch) { super(endpoint); this.elSql = elSql; this.elSqlName = elSqlName; this.jdbcTemplate = jdbcTemplate; this.dataSource = dataSource; + this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; + this.batch = batch; } @Override @@ -70,14 +77,14 @@ public class ElsqlProducer extends DefaultProducer { @Override public void process(final Exchange exchange) throws Exception { - Object data = exchange.getIn().getBody(); + final Object data = exchange.getIn().getBody(); final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data); final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param)); LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql); // special for processing stream list (batch not supported) - SqlOutputType outputType = getEndpoint().getOutputType(); + final SqlOutputType outputType = getEndpoint().getOutputType(); if (outputType == SqlOutputType.StreamList) { processStreamList(exchange, sql, param); return; @@ -86,33 +93,45 @@ public class ElsqlProducer extends DefaultProducer { log.trace("jdbcTemplate.execute: {}", sql); jdbcTemplate.execute(sql, param, new PreparedStatementCallback<Object>() { @Override - public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { + public Object doInPreparedStatement(final PreparedStatement ps) throws SQLException, DataAccessException { ResultSet rs = null; try { - boolean isResultSet = ps.execute(); - if (isResultSet) { - rs = ps.getResultSet(); - - // preserve headers first, so we can override the SQL_ROW_COUNT header - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); - - SqlOutputType outputType = getEndpoint().getOutputType(); - log.trace("Got result list from query: {}, outputType={}", rs, outputType); - if (outputType == SqlOutputType.SelectList) { - List<?> data = getEndpoint().queryForList(rs, true); - // for noop=true we still want to enrich with the row count header - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); - } else { - exchange.getOut().setBody(data); - } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size()); - } else if (outputType == SqlOutputType.SelectOne) { - Object data = getEndpoint().queryForObject(rs); - if (data != null) { + boolean isResultSet = false; + + final int expected = ps.getParameterMetaData().getParameterCount(); + + if (expected > 0 && batch) { + final String sqlForDefaultPreparedStamentStrategy = sql.replaceAll(":", ":\\?"); + final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sqlForDefaultPreparedStamentStrategy, getEndpoint().isAllowNamedParameters(), exchange); + final Iterator<?> iterator = exchange.getIn().getBody(Iterator.class); + while (iterator != null && iterator.hasNext()) { + final Object value = iterator.next(); + final Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sqlForDefaultPreparedStamentStrategy, preparedQuery, expected, exchange, value); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); + ps.addBatch(); + } + } + + // execute the prepared statement and populate the outgoing message + if (batch) { + final int[] updateCounts = ps.executeBatch(); + int total = 0; + for (final int count : updateCounts) { + total += count; + } + exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total); + } else { + isResultSet = ps.execute(); + if (isResultSet) { + rs = ps.getResultSet(); + + // preserve headers first, so we can override the SQL_ROW_COUNT header + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + + final SqlOutputType outputType = getEndpoint().getOutputType(); + log.trace("Got result list from query: {}, outputType={}", rs, outputType); + if (outputType == SqlOutputType.SelectList) { + final List<?> data = getEndpoint().queryForList(rs, true); // for noop=true we still want to enrich with the row count header if (getEndpoint().isNoop()) { exchange.getOut().setBody(exchange.getIn().getBody()); @@ -122,46 +141,60 @@ public class ElsqlProducer extends DefaultProducer { } else { exchange.getOut().setBody(data); } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1); - } else { - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size()); + } else if (outputType == SqlOutputType.SelectOne) { + final Object data = getEndpoint().queryForObject(rs); + if (data != null) { + // for noop=true we still want to enrich with the row count header + if (getEndpoint().isNoop()) { + exchange.getOut().setBody(exchange.getIn().getBody()); + } else if (getEndpoint().getOutputHeader() != null) { + exchange.getOut().setBody(exchange.getIn().getBody()); + exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); + } else { + exchange.getOut().setBody(data); + } + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1); + } else { + if (getEndpoint().isNoop()) { + exchange.getOut().setBody(exchange.getIn().getBody()); + } else if (getEndpoint().getOutputHeader() != null) { + exchange.getOut().setBody(exchange.getIn().getBody()); + } + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0); } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0); + } else { + throw new IllegalArgumentException("Invalid outputType=" + outputType); } } else { - throw new IllegalArgumentException("Invalid outputType=" + outputType); + // if we are here, there isResultSet is false. This can happen only if we are doing an update operation or there is no result. + // we can simply add the updateCount in this case. + exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); } - } else { - // if we are here, there isResultSet is false. This can happen only if we are doing an update operation or there is no result. - // we can simply add the updateCount in this case. - exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); } - } finally { - closeResultSet(rs); - } + } finally { + closeResultSet(rs); + } return null; } }); } - protected void processStreamList(Exchange exchange, String sql, SqlParameterSource param) throws Exception { + protected void processStreamList(final Exchange exchange, final String sql, final SqlParameterSource param) throws Exception { // spring JDBC to parse the SQL and build the prepared statement creator // this is what NamedJdbcTemplate does internally - ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql); - String sqlToUse = NamedParameterUtils.substituteNamedParameters(parsedSql, param); - Object[] params = NamedParameterUtils.buildValueArray(parsedSql, param, null); - List<SqlParameter> declaredParameters = NamedParameterUtils.buildSqlParameterList(parsedSql, param); - PreparedStatementCreatorFactory pscf = new PreparedStatementCreatorFactory(sqlToUse, declaredParameters); - PreparedStatementCreator statementCreator = pscf.newPreparedStatementCreator(params); + final ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql); + final String sqlToUse = NamedParameterUtils.substituteNamedParameters(parsedSql, param); + final Object[] params = NamedParameterUtils.buildValueArray(parsedSql, param, null); + final List<SqlParameter> declaredParameters = NamedParameterUtils.buildSqlParameterList(parsedSql, param); + final PreparedStatementCreatorFactory pscf = new PreparedStatementCreatorFactory(sqlToUse, declaredParameters); + final PreparedStatementCreator statementCreator = pscf.newPreparedStatementCreator(params); processStreamList(exchange, statementCreator, sqlToUse); } - protected void processStreamList(Exchange exchange, PreparedStatementCreator statementCreator, String preparedQuery) throws Exception { + protected void processStreamList(final Exchange exchange, final PreparedStatementCreator statementCreator, final String preparedQuery) throws Exception { log.trace("processStreamList: {}", preparedQuery); // do not use the jdbcTemplate as it will auto-close connection/ps/rs when exiting the execute method @@ -174,10 +207,10 @@ public class ElsqlProducer extends DefaultProducer { con = dataSource.getConnection(); ps = statementCreator.createPreparedStatement(con); - boolean isResultSet = ps.execute(); + final boolean isResultSet = ps.execute(); if (isResultSet) { rs = ps.getResultSet(); - ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs); + final ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs); if (getEndpoint().isNoop()) { exchange.getOut().setBody(exchange.getIn().getBody()); } else if (getEndpoint().getOutputHeader() != null) { @@ -190,7 +223,7 @@ public class ElsqlProducer extends DefaultProducer { // defer closing the iterator when the exchange is complete exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator)); } - } catch (Exception e) { + } catch (final Exception e) { // in case of exception then close all this before rethrow closeConnection(con); closeStatement(ps); http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java new file mode 100644 index 0000000..4717217 --- /dev/null +++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerBatchTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elsql; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.SqlConstants; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Test; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +public class ElSqlProducerBatchTest extends CamelTestSupport { + + private EmbeddedDatabase db; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + // this is the database we create with some initial data for our unit test + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); + + jndi.bind("dataSource", db); + + return jndi; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void testBatchMode() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.message(0).header(SqlConstants.SQL_UPDATE_COUNT).isEqualTo(1); + + Map<String, Object> batchParams = new HashMap<>(); + batchParams.put("id", "4"); + batchParams.put("license", "GNU"); + batchParams.put("project", "Batch"); + + template.sendBody("direct:batch", batchParams); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from("direct:batch") + .to("elsql:insertProject:elsql/projects.elsql?dataSource=#dataSource&batch=true") + .to("mock:result"); + + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d96e0d36/components/camel-elsql/src/test/resources/elsql/projects.elsql ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql b/components/camel-elsql/src/test/resources/elsql/projects.elsql index 42fc6e9..995e272 100644 --- a/components/camel-elsql/src/test/resources/elsql/projects.elsql +++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql @@ -20,3 +20,6 @@ UPDATE projects SET license = :lic WHERE id = :id +@NAME(insertProject) + INSERT INTO projects (id, project, license) + VALUES (:id, :project, :license) \ No newline at end of file