Updated Branches: refs/heads/master bd34771fa -> ddf7ba9b9
CAMEL-6157: Added support for named parameters to camel-jdbc, like we have in camel-sql. Thanks to Devendra Khanolkar for contribution. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ddf7ba9b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ddf7ba9b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ddf7ba9b Branch: refs/heads/master Commit: ddf7ba9b9140eaaab3c784732bfd6eeb1f0114d2 Parents: bd34771 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jun 2 17:22:49 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jun 2 17:22:49 2013 +0200 ---------------------------------------------------------------------- .../jdbc/DefaultJdbcPrepareStatementStrategy.java | 167 +++++++++++++++ .../apache/camel/component/jdbc/JdbcComponent.java | 2 +- .../apache/camel/component/jdbc/JdbcConstants.java | 7 +- .../apache/camel/component/jdbc/JdbcEndpoint.java | 36 +++- .../jdbc/JdbcPrepareStatementStrategy.java | 62 ++++++ .../apache/camel/component/jdbc/JdbcProducer.java | 51 ++++- .../component/jdbc/JdbcParameterizedQueryTest.java | 108 ++++++++++ 7 files changed, 422 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/DefaultJdbcPrepareStatementStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/DefaultJdbcPrepareStatementStrategy.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/DefaultJdbcPrepareStatementStrategy.java new file mode 100644 index 0000000..04aad60 --- /dev/null +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/DefaultJdbcPrepareStatementStrategy.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jdbc; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeExchangeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default {@link JdbcPrepareStatementStrategy} which is a copy from the camel-sql component having + * this functionality first. + */ +public class DefaultJdbcPrepareStatementStrategy implements JdbcPrepareStatementStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultJdbcPrepareStatementStrategy.class); + + @Override + public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException { + String answer; + if (allowNamedParameters && hasNamedParameters(query)) { + // replace all :?word with just ? + answer = query.replaceAll("\\:\\?\\w+", "\\?"); + } else { + answer = query; + } + + LOG.trace("Prepared query: {}", answer); + return answer; + } + + @Override + public Iterator<?> createPopulateIterator(final String query, final String preparedQuery, final int expectedParams, + final Exchange exchange, final Object value) throws SQLException { + Map<?, ?> map = null; + if (exchange.getIn().hasHeaders()) { + if (exchange.getIn().getHeader(JdbcConstants.JDBC_PARAMETERS) != null) { + // header JDBC_PARAMETERS takes precedence over regular headers + map = exchange.getIn().getHeader(JdbcConstants.JDBC_PARAMETERS, Map.class); + } else { + map = exchange.getIn().getHeaders(); + } + } + + final Map<?, ?> headerMap = map; + + if (hasNamedParameters(query)) { + // create an iterator that returns the value in the named order + try { + + return new Iterator<Object>() { + private NamedQueryParser parser = new NamedQueryParser(query); + private Object next; + private boolean done; + + @Override + public boolean hasNext() { + if (done) { + return false; + } + if (next == null) { + next = next(); + } + return next != null; + } + + @Override + public Object next() { + if (next == null) { + String key = parser.next(); + if (key == null) { + done = true; + return null; + } + // the key is expected to exist, if not report so end user can see this + boolean contains = headerMap != null && headerMap.containsKey(key); + if (!contains) { + throw new RuntimeExchangeException("Cannot find key [" + key + "] in message body or headers to use when setting named parameter in query [" + query + "]", exchange); + } + next = headerMap.get(key); + } + Object answer = next; + next = null; + return answer; + } + + @Override + public void remove() { + // noop + } + }; + } catch (Exception e) { + throw new SQLException("Error iterating parameters for the query: " + query, e); + } + + } else { + // just use a regular iterator + return exchange.getContext().getTypeConverter().convertTo(Iterator.class, headerMap != null ? headerMap.values() : null); + } + } + + @Override + public void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException { + int argNumber = 1; + if (expectedParams > 0) { + // as the headers may have more values than the SQL needs we just break out when we reached the expected number + while (iterator != null && iterator.hasNext() && argNumber <= expectedParams) { + Object value = iterator.next(); + LOG.trace("Setting parameter #{} with value: {}", argNumber, value); + ps.setObject(argNumber, value); + argNumber++; + } + } + + if (argNumber - 1 != expectedParams) { + throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1)); + } + } + + protected boolean hasNamedParameters(String query) { + NamedQueryParser parser = new NamedQueryParser(query); + return parser.next() != null; + } + + private static final class NamedQueryParser { + + private static final Pattern PATTERN = Pattern.compile("\\:\\?(\\w+)"); + private final Matcher matcher; + + private NamedQueryParser(String query) { + this.matcher = PATTERN.matcher(query); + } + + public String next() { + if (!matcher.find()) { + return null; + } + + return matcher.group(1); + } + + public String replaceAll(String replacement) { + return matcher.replaceAll(replacement); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcComponent.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcComponent.java index 41b9eec..211362d 100755 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcComponent.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcComponent.java @@ -26,7 +26,7 @@ import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.IntrospectionSupport; /** - * @version + * @version */ public class JdbcComponent extends DefaultComponent { private DataSource ds; http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcConstants.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcConstants.java index 3a1589a..c5621fd 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcConstants.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcConstants.java @@ -27,6 +27,8 @@ public final class JdbcConstants { public static final String JDBC_COLUMN_NAMES = "CamelJdbcColumnNames"; + public static final String JDBC_PARAMETERS = "CamelJdbcParameters"; + /** * Boolean input header. * Set its value to true to retrieve generated keys, default is false @@ -36,10 +38,11 @@ public final class JdbcConstants { /** * <tt>String[]</tt> or <tt>int[]</tt> input header - optional * Set it to specify the expected generated columns, see: + * * @see <a href="http://docs.oracle.com/javase/6/docs/api/java/sql/Statement.html#execute(java.lang.String, int[])"> - * java.sql.Statement.execute(java.lang.String, int[])</a> + * java.sql.Statement.execute(java.lang.String, int[])</a> * @see <a href="http://docs.oracle.com/javase/6/docs/api/java/sql/Statement.html#execute(java.lang.String, java.lang.String[])"> - * java.sql.Statement.execute(java.lang.String, java.lang.String[])</a> + * java.sql.Statement.execute(java.lang.String, java.lang.String[])</a> */ public static final String JDBC_GENERATED_COLUMNS = "CamelGeneratedColumns"; http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java index 5be15c1..4a55093 100755 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java @@ -26,7 +26,7 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; /** - * @version + * @version */ public class JdbcEndpoint extends DefaultEndpoint { private int readSize; @@ -35,6 +35,9 @@ public class JdbcEndpoint extends DefaultEndpoint { private DataSource dataSource; private Map<String, Object> parameters; private boolean useJDBC4ColumnNameAndLabelSemantics = true; + private JdbcPrepareStatementStrategy prepareStatementStrategy = new DefaultJdbcPrepareStatementStrategy(); + private boolean allowNamedParameters = true; + private boolean useHeadersAsParameters; public JdbcEndpoint() { } @@ -53,7 +56,7 @@ public class JdbcEndpoint extends DefaultEndpoint { } public Producer createProducer() throws Exception { - return new JdbcProducer(this, dataSource, readSize, parameters); + return new JdbcProducer(this, dataSource, readSize, parameters); } public int getReadSize() { @@ -96,7 +99,7 @@ public class JdbcEndpoint extends DefaultEndpoint { * Optional parameters to the {@link java.sql.Statement}. * <p/> * For example to set maxRows, fetchSize etc. - * + * * @param parameters parameters which will be set using reflection */ public void setParameters(Map<String, Object> parameters) { @@ -116,12 +119,37 @@ public class JdbcEndpoint extends DefaultEndpoint { * <p/> * This option is default <tt>true</tt>. * - * @param useJDBC4ColumnNameAndLabelSemantics <tt>true</tt> to use JDBC 4.0 semantics, <tt>false</tt> to use JDBC 3.0. + * @param useJDBC4ColumnNameAndLabelSemantics + * <tt>true</tt> to use JDBC 4.0 semantics, <tt>false</tt> to use JDBC 3.0. */ public void setUseJDBC4ColumnNameAndLabelSemantics(boolean useJDBC4ColumnNameAndLabelSemantics) { this.useJDBC4ColumnNameAndLabelSemantics = useJDBC4ColumnNameAndLabelSemantics; } + public JdbcPrepareStatementStrategy getPrepareStatementStrategy() { + return prepareStatementStrategy; + } + + public void setPrepareStatementStrategy(JdbcPrepareStatementStrategy prepareStatementStrategy) { + this.prepareStatementStrategy = prepareStatementStrategy; + } + + public boolean isAllowNamedParameters() { + return allowNamedParameters; + } + + public void setAllowNamedParameters(boolean allowNamedParameters) { + this.allowNamedParameters = allowNamedParameters; + } + + public boolean isUseHeadersAsParameters() { + return useHeadersAsParameters; + } + + public void setUseHeadersAsParameters(boolean useHeadersAsParameters) { + this.useHeadersAsParameters = useHeadersAsParameters; + } + @Override protected String createEndpointUri() { return "jdbc"; http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcPrepareStatementStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcPrepareStatementStrategy.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcPrepareStatementStrategy.java new file mode 100644 index 0000000..8615ecc --- /dev/null +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcPrepareStatementStrategy.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jdbc; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; + +import org.apache.camel.Exchange; + +/** + * Strategy for preparing statements when executing SQL queries. + */ +public interface JdbcPrepareStatementStrategy { + + /** + * Prepares the query to be executed. + * + * @param query the query which may contain named query parameters + * @param allowNamedParameters whether named parameters is allowed + * @return the query to actually use, which must be accepted by the JDBC driver. + */ + String prepareQuery(String query, boolean allowNamedParameters) throws SQLException; + + /** + * Creates the iterator to use when setting query parameters on the prepared statement. + * + * @param query the original query which may contain named parameters + * @param preparedQuery the query to actually use, which must be accepted by the JDBC driver. + * @param expectedParams number of expected parameters + * @param exchange the current exchange + * @param value the message body that contains the data for the query parameters + * @return the iterator + * @throws SQLException is thrown if error creating the iterator + */ + Iterator<?> createPopulateIterator(String query, String preparedQuery, int expectedParams, Exchange exchange, Object value) throws SQLException; + + /** + * Populates the query parameters on the prepared statement + * + * @param ps the prepared statement + * @param iterator the iterator to use for getting the parameter data + * @param expectedParams number of expected parameters + * @throws SQLException is thrown if error populating parameters + */ + void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java old mode 100755 new mode 100644 index fff89fc..301ea32 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java @@ -17,12 +17,14 @@ package org.apache.camel.component.jdbc; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -109,8 +111,47 @@ public class JdbcProducer extends DefaultProducer { } private void createAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { + if (getEndpoint().isUseHeadersAsParameters()) { + doCreateAndExecuteSqlStatementWithHeaders(exchange, sql, conn); + } else { + doCreateAndExecuteSqlStatement(exchange, sql, conn); + } + } + + private void doCreateAndExecuteSqlStatementWithHeaders(Exchange exchange, String sql, Connection conn) throws Exception { + PreparedStatement ps = null; + ResultSet rs = null; + + try { + final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, getEndpoint().isAllowNamedParameters()); + ps = conn.prepareStatement(preparedQuery); + int expectedCount = ps.getParameterMetaData().getParameterCount(); + + if (expectedCount > 0) { + Iterator<?> it = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expectedCount, exchange, exchange.getIn().getBody()); + getEndpoint().getPrepareStatementStrategy().populateStatement(ps, it, expectedCount); + } + + LOG.debug("Executing JDBC PreparedStatement: {}", sql); + + boolean stmtExecutionResult = ps.execute(); + if (stmtExecutionResult) { + rs = ps.getResultSet(); + setResultSet(exchange, rs); + } else { + int updateCount = ps.getUpdateCount(); + exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount); + } + } finally { + closeQuietly(rs); + closeQuietly(ps); + } + } + + private void doCreateAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { Statement stmt = null; ResultSet rs = null; + try { stmt = conn.createStatement(); @@ -118,7 +159,7 @@ public class JdbcProducer extends DefaultProducer { IntrospectionSupport.setProperties(stmt, parameters); } - LOG.debug("Executing JDBC statement: {}", sql); + LOG.debug("Executing JDBC Statement: {}", sql); Boolean shouldRetrieveGeneratedKeys = exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, Boolean.class); @@ -135,7 +176,7 @@ public class JdbcProducer extends DefaultProducer { } else { throw new IllegalArgumentException( "Header specifying expected returning columns isn't an instance of String[] or int[] but " - + expectedGeneratedColumns.getClass()); + + expectedGeneratedColumns.getClass()); } } else { stmtExecutionResult = stmt.execute(sql); @@ -204,7 +245,7 @@ public class JdbcProducer extends DefaultProducer { * - {@link JdbcConstants#JDBC_GENERATED_KEYS_ROW_COUNT} : the row count of generated keys * - {@link JdbcConstants#JDBC_GENERATED_KEYS_DATA} : the generated keys data * - * @param exchange The exchange where to store the generated keys + * @param exchange The exchange where to store the generated keys * @param generatedKeys The result set containing the generated keys */ protected void setGeneratedKeys(Exchange exchange, ResultSet generatedKeys) throws SQLException { @@ -223,7 +264,9 @@ public class JdbcProducer extends DefaultProducer { List<Map<String, Object>> data = extractResultSetData(rs); exchange.getOut().setHeader(JdbcConstants.JDBC_ROW_COUNT, data.size()); - exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, data.get(0).keySet()); + if (!data.isEmpty()) { + exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, data.get(0).keySet()); + } exchange.getOut().setBody(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/ddf7ba9b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcParameterizedQueryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcParameterizedQueryTest.java b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcParameterizedQueryTest.java new file mode 100644 index 0000000..81263b9 --- /dev/null +++ b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcParameterizedQueryTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jdbc; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class JdbcParameterizedQueryTest extends AbstractJdbcTestSupport { + + @Test + public void testParameterizedQueryNoNames() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + // must be linked so we can dictate the order + Map<String, Object> jdbcParams = new LinkedHashMap<String, Object>(); + jdbcParams.put("id", "cust1"); + jdbcParams.put("name", "jstrachan"); + + template.sendBodyAndHeaders("direct:start", "select * from customer where id = ? and name = ? order by ID", jdbcParams); + + assertMockEndpointsSatisfied(); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(1, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("jstrachan", row.get("NAME")); + } + + @Test + public void testParameterizedQuery() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + Map<String, Object> jdbcParams = new HashMap<String, Object>(); + jdbcParams.put("id", "cust1"); + jdbcParams.put("name", "jstrachan"); + + template.sendBodyAndHeaders("direct:start", "select * from customer where id = :?id and name = :?name order by ID", jdbcParams); + + assertMockEndpointsSatisfied(); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(1, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("jstrachan", row.get("NAME")); + } + + @Test + public void testParameterizedQueryJdbcHeader() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + Map<String, Object> jdbcParams = new HashMap<String, Object>(); + jdbcParams.put("id", "cust1"); + jdbcParams.put("name", "jstrachan"); + + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("id", "cust2"); + // this header should take precedence so we will not get cust2 + headers.put(JdbcConstants.JDBC_PARAMETERS, jdbcParams); + + template.sendBodyAndHeaders("direct:start", "select * from customer where id = :?id and name = :?name order by ID", headers); + + assertMockEndpointsSatisfied(); + + List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(1, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("jstrachan", row.get("NAME")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + getContext().setUseBreadcrumb(false); + + getContext().getComponent("jdbc", JdbcComponent.class).setDataSource(db); + + from("direct:start") + .to("jdbc:testdb?useHeadersAsParameters=true") + .to("mock:result"); + } + }; + } + +} \ No newline at end of file