Repository: camel Updated Branches: refs/heads/master 55b0c10db -> b93083465
CAMEL-3907: Add option useMessageForSql to have more dynamic queries. Thansk to Zemian Deng for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b9308346 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b9308346 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b9308346 Branch: refs/heads/master Commit: b93083465de2b4e5326aeba18780068aff56f5bb Parents: 55b0c10 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 12 10:30:17 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 12 10:30:17 2015 +0200 ---------------------------------------------------------------------- .../camel/component/sql/SqlConstants.java | 5 + .../apache/camel/component/sql/SqlEndpoint.java | 17 +- .../apache/camel/component/sql/SqlProducer.java | 41 +++-- .../SqlProducerUseMessageBodyForSqlTest.java | 171 +++++++++++++++++++ 4 files changed, 222 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java index 42330c4..2dd7cdc 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java @@ -54,6 +54,11 @@ public final class SqlConstants { */ public static final String SQL_GENERATED_KEYS_DATA = "CamelSqlGeneratedKeyRows"; + /** + * The SQL parameters when using the option useMessageBodyForSql + */ + public static final String SQL_PARAMETERS = "CamelSqlParameters"; + private SqlConstants() { // Utility class } http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java index 507b071..1086d55 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java @@ -86,6 +86,8 @@ public class SqlEndpoint extends DefaultPollingEndpoint { private boolean noop; @UriParam private String outputHeader; + @UriParam(label = "producer") + private boolean useMessageBodyForSql; public SqlEndpoint() { } @@ -110,7 +112,7 @@ public class SqlEndpoint extends DefaultPollingEndpoint { public Producer createProducer() throws Exception { SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator); - SqlProducer result = new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement); + SqlProducer result = new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement, useMessageBodyForSql); result.setParametersCount(parametersCount); return result; } @@ -318,6 +320,19 @@ public class SqlEndpoint extends DefaultPollingEndpoint { this.outputHeader = outputHeader; } + public boolean isUseMessageBodyForSql() { + return useMessageBodyForSql; + } + + /** + * Whether to use the message body as the SQL and then headers for parameters. + * <p/> + * If this option is enabled then the SQL in the uri is not used. + */ + public void setUseMessageBodyForSql(boolean useMessageBodyForSql) { + this.useMessageBodyForSql = useMessageBodyForSql; + } + public String getDataSourceRef() { return dataSourceRef; } http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index c5eda07..9f2e9d2 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -27,7 +27,9 @@ import java.util.List; import java.util.Map; import org.apache.camel.Exchange; +import org.apache.camel.NoSuchHeaderException; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ExchangeHelper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; import org.springframework.jdbc.core.PreparedStatementCreator; @@ -35,21 +37,23 @@ import org.springframework.jdbc.core.PreparedStatementCreator; import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; public class SqlProducer extends DefaultProducer { - private String query; - private JdbcTemplate jdbcTemplate; - private boolean batch; - private boolean alwaysPopulateStatement; - private SqlPrepareStatementStrategy sqlPrepareStatementStrategy; + private final String query; + private final JdbcTemplate jdbcTemplate; + private final boolean batch; + private final boolean alwaysPopulateStatement; + private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy; + private final boolean useMessageBodyForSql; private int parametersCount; public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, - boolean batch, boolean alwaysPopulateStatement) { + boolean batch, boolean alwaysPopulateStatement, boolean useMessageBodyForSql) { super(endpoint); this.jdbcTemplate = jdbcTemplate; this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; this.query = query; this.batch = batch; this.alwaysPopulateStatement = alwaysPopulateStatement; + this.useMessageBodyForSql = useMessageBodyForSql; } @Override @@ -58,9 +62,13 @@ public class SqlProducer extends DefaultProducer { } public void process(final Exchange exchange) throws Exception { - String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class); - - final String sql = queryHeader != null ? queryHeader : query; + final String sql; + if (useMessageBodyForSql) { + sql = exchange.getIn().getBody(String.class); + } else { + String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class); + sql = queryHeader != null ? queryHeader : query; + } final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters()); // CAMEL-7313 - check whether to return generated keys @@ -99,7 +107,12 @@ public class SqlProducer extends DefaultProducer { if (alwaysPopulateStatement || expected > 0) { // transfer incoming message body data to prepared statement parameters, if necessary if (batch) { - Iterator<?> iterator = exchange.getIn().getBody(Iterator.class); + Iterator<?> iterator; + if (useMessageBodyForSql) { + iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class); + } else { + iterator = exchange.getIn().getBody(Iterator.class); + } while (iterator != null && iterator.hasNext()) { Object value = iterator.next(); Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); @@ -107,7 +120,13 @@ public class SqlProducer extends DefaultProducer { ps.addBatch(); } } else { - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody()); + Object value; + if (useMessageBodyForSql) { + value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS); + } else { + value = exchange.getIn().getBody(); + } + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); sqlPrepareStatementStrategy.populateStatement(ps, i, expected); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java new file mode 100644 index 0000000..ea8b7d8 --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +public class SqlProducerUseMessageBodyForSqlTest extends CamelTestSupport { + + private EmbeddedDatabase db; + + @Before + public void setUp() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); + + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void testUseMessageBodyForSqlAndHeaderParams() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:start") + .setBody(constant("select * from projects where license = :?lic order by id")) + .to("sql://query?useMessageBodyForSql=true") + .to("mock:result"); + } + }); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", null, "lic", "ASF"); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("Camel", row.get("PROJECT")); + + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("AMQ", row.get("PROJECT")); + } + + @Test + @SuppressWarnings("unchecked") + public void testUseMessageBodyForSqlAndCamelSqlParameters() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:start") + .setBody(constant("select * from projects where license = :?lic order by id")) + .to("sql://query?useMessageBodyForSql=true") + .to("mock:result"); + } + }); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + Map<String, Object> row = new HashMap<String, Object>(); + row.put("lic", "ASF"); + template.sendBodyAndHeader("direct:start", null, SqlConstants.SQL_PARAMETERS, row); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("Camel", row.get("PROJECT")); + + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("AMQ", row.get("PROJECT")); + } + + @Test + @SuppressWarnings("unchecked") + public void testUseMessageBodyForSqlAndCamelSqlParametersBatch() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:start") + .setBody(constant("insert into projects(id, project, license) values(:?id,:?project,:?lic)")) + .to("sql://query?useMessageBodyForSql=true&batch=true") + .to("mock:result"); + } + }); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); + Map<String, Object> row = new HashMap<String, Object>(); + row.put("id", 200); + row.put("project", "MyProject1"); + row.put("lic", "OPEN1"); + rows.add(row); + row = new HashMap<String, Object>(); + row.put("id", 201); + row.put("project", "MyProject2"); + row.put("lic", "OPEN1"); + rows.add(row); + template.sendBodyAndHeader("direct:start", null, SqlConstants.SQL_PARAMETERS, rows); + + String origSql = assertIsInstanceOf(String.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals("insert into projects(id, project, license) values(:?id,:?project,:?lic)", origSql); + + assertEquals(null, mock.getReceivedExchanges().get(0).getOut().getBody()); + + // Clear and then use route2 to verify result of above insert select + context.removeRoute(context.getRoutes().get(0).getId()); + context.addRoutes(new RouteBuilder() { + public void configure() { + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:start2") + .setBody(constant("select * from projects where license = :?lic order by id")) + .to("sql://query2?useMessageBodyForSql=true") + .to("mock:result2"); + } + }); + + mock = getMockEndpoint("mock:result2"); + mock.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start2", null, "lic", "OPEN1"); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("MyProject1", row.get("PROJECT")); + + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("MyProject2", row.get("PROJECT")); + } +}