CAMEL-6418: Added separator option. Fixed and improved when using String as body with # placeholders to honor qutes and use new separator option.
Conflicts: components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6f10a76b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6f10a76b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6f10a76b Branch: refs/heads/camel-2.11.x Commit: 6f10a76bcd28939dddf96a231d35a492f68ade79 Parents: 51501fd Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jun 3 15:48:00 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jun 3 16:01:09 2013 +0200 ---------------------------------------------------------------------- .../sql/DefaultSqlPrepareStatementStrategy.java | 24 ++++- .../sql/DefaultSqlProcessingStrategy.java | 13 ++- .../apache/camel/component/sql/SqlConsumer.java | 13 ++- .../apache/camel/component/sql/SqlEndpoint.java | 20 +++- .../apache/camel/component/sql/SqlProducer.java | 15 ++- .../component/sql/SqlProducerSeparatorTest.java | 79 +++++++++++++++ 6 files changed, 144 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java index 0540c1d..5928f64 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java @@ -18,13 +18,16 @@ package org.apache.camel.component.sql; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.camel.Exchange; import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.util.StringQuoteHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +37,15 @@ import org.slf4j.LoggerFactory; public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy { private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlPrepareStatementStrategy.class); + private final char separator; + + public DefaultSqlPrepareStatementStrategy() { + this(','); + } + + public DefaultSqlPrepareStatementStrategy(char separator) { + this.separator = separator; + } @Override public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException { @@ -111,8 +123,16 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt } else { - // just use a regular iterator - return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value); + // is the body a String + if (value instanceof String) { + // if the body is a String then honor quotes etc. + String[] tokens = StringQuoteHelper.splitSafeQuote((String)value, separator, true); + List<String> list = Arrays.asList(tokens); + return list.iterator(); + } else { + // just use a regular iterator + return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java index 787c3d6..d641c0f 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java @@ -32,19 +32,24 @@ import org.springframework.jdbc.core.PreparedStatementCallback; public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy { private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class); + private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy; + + public DefaultSqlProcessingStrategy(SqlPrepareStatementStrategy sqlPrepareStatementStrategy) { + this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; + } @Override public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception { - final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters()); + final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters()); return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() { public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException { int expected = ps.getParameterMetaData().getParameterCount(); - Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data); + Iterator<?> iterator = sqlPrepareStatementStrategy.createPopulateIterator(query, preparedQuery, expected, exchange, data); if (iterator != null) { - endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected); + sqlPrepareStatementStrategy.populateStatement(ps, iterator, expected); LOG.trace("Execute query {}", query); ps.execute(); @@ -62,7 +67,7 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy { @Override public int commitBatchComplete(final SqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception { - final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters()); + final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters()); return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() { public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException { http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index 89643be..5335f76 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -44,6 +44,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { private final String query; private final JdbcTemplate jdbcTemplate; + private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy; + private final SqlProcessingStrategy sqlProcessingStrategy; private String onConsume; private String onConsumeFailed; @@ -61,10 +63,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { } } - public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query) { + public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, + SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { super(endpoint, processor); this.jdbcTemplate = jdbcTemplate; this.query = query; + this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; + this.sqlProcessingStrategy = sqlProcessingStrategy; } @Override @@ -78,7 +83,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { shutdownRunningTask = null; pendingExchanges = 0; - final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(query, getEndpoint().isAllowNamedParameters()); + final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, getEndpoint().isAllowNamedParameters()); Integer messagePolled = jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() { @Override @@ -171,7 +176,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { try { // we can only run on consume if there was data if (data != null && sql != null) { - int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql); + int updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql); if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) { String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql; throw new SQLException(msg); @@ -188,7 +193,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { try { if (onConsumeBatchComplete != null) { - int updateCount = getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete); + int updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete); log.debug("onConsumeBatchComplete update count {}", updateCount); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java index 06249c8..70c70e4 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java @@ -34,13 +34,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint { private String query; private boolean batch; private int maxMessagesPerPoll; - private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy(); - private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy(); + private SqlProcessingStrategy processingStrategy; + private SqlPrepareStatementStrategy prepareStatementStrategy; private String onConsume; private String onConsumeFailed; private String onConsumeBatchComplete; private boolean allowNamedParameters = true; private boolean alwaysPopulateStatement; + private char separator = ','; public SqlEndpoint() { } @@ -52,7 +53,9 @@ public class SqlEndpoint extends DefaultPollingEndpoint { } public Consumer createConsumer(Processor processor) throws Exception { - SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query); + SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator); + SqlProcessingStrategy proStrategy = processingStrategy != null ? processingStrategy : new DefaultSqlProcessingStrategy(prepareStrategy); + SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query, prepareStrategy, proStrategy); consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); consumer.setOnConsume(getOnConsume()); consumer.setOnConsumeFailed(getOnConsumeFailed()); @@ -62,7 +65,8 @@ public class SqlEndpoint extends DefaultPollingEndpoint { } public Producer createProducer() throws Exception { - return new SqlProducer(this, query, jdbcTemplate, batch, alwaysPopulateStatement); + SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator); + return new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement); } public boolean isSingleton() { @@ -157,6 +161,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint { this.alwaysPopulateStatement = alwaysPopulateStatement; } + public char getSeparator() { + return separator; + } + + public void setSeparator(char separator) { + this.separator = separator; + } + @Override protected String createEndpointUri() { // Make sure it's properly encoded http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index 1fa6864..97ff150 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -34,10 +34,13 @@ public class SqlProducer extends DefaultProducer { private JdbcTemplate jdbcTemplate; private boolean batch; private boolean alwaysPopulateStatement; + private SqlPrepareStatementStrategy sqlPrepareStatementStrategy; - public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch, boolean alwaysPopulateStatement) { + public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, + boolean batch, boolean alwaysPopulateStatement) { super(endpoint); this.jdbcTemplate = jdbcTemplate; + this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; this.query = query; this.batch = batch; this.alwaysPopulateStatement = alwaysPopulateStatement; @@ -52,7 +55,7 @@ public class SqlProducer extends DefaultProducer { String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class); final String sql = queryHeader != null ? queryHeader : query; - final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, getEndpoint().isAllowNamedParameters()); + final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters()); jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Map<?, ?>>() { public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException { @@ -65,13 +68,13 @@ public class SqlProducer extends DefaultProducer { Iterator<?> iterator = exchange.getIn().getBody(Iterator.class); while (iterator != null && iterator.hasNext()) { Object value = iterator.next(); - Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, value); - getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected); + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); ps.addBatch(); } } else { - Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody()); - getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected); + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody()); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java new file mode 100755 index 0000000..3182cd1 --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +/** + * @version + */ +public class SqlProducerSeparatorTest extends CamelTestSupport { + + private EmbeddedDatabase db; + private JdbcTemplate jdbcTemplate; + + @Before + public void setUp() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); + + jdbcTemplate = new JdbcTemplate(db); + + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void testSeparator() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + template.sendBody("direct:start", "4;'Food, Inc';'LGPL'"); + + mock.assertIsSatisfied(); + + assertEquals(4, jdbcTemplate.queryForInt("select count(*) from projects")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:start") + .to("sql:insert into projects (id, project, license) values (#, #, #)?separator=;") + .to("mock:result"); + } + }; + } +}