This is an automated email from the ASF dual-hosted git repository. jeremyross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 7fbc59c CAMEL-16206: camel-sql: Preserve message body (#5086) 7fbc59c is described below commit 7fbc59c4abde655cda26c54be6605e4c469a7c6a Author: Jeremy Ross <jeremy.g.r...@gmail.com> AuthorDate: Mon Feb 15 15:52:01 2021 -0600 CAMEL-16206: camel-sql: Preserve message body (#5086) CAMEL-16206: camel-sql: Preserve message body when CamelSqlRetrieveGeneratedKeys == true, or when there's nothing else useful to put in the body. --- .../apache/camel/component/sql/SqlProducer.java | 226 ++++++++------------- .../camel/component/sql/SqlGeneratedKeysTest.java | 5 +- 2 files changed, 90 insertions(+), 141 deletions(-) diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index 2a5b36f..0890cce 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -24,7 +24,6 @@ import java.sql.Statement; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; @@ -105,75 +104,54 @@ public class SqlProducer extends DefaultProducer { final Boolean shouldRetrieveGeneratedKeys = exchange.getIn().getHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, false, Boolean.class); - PreparedStatementCreator statementCreator = new PreparedStatementCreator() { - @Override - public PreparedStatement createPreparedStatement(Connection con) throws SQLException { - if (!shouldRetrieveGeneratedKeys) { - return con.prepareStatement(preparedQuery); + PreparedStatementCreator statementCreator = con -> { + if (!shouldRetrieveGeneratedKeys) { + return con.prepareStatement(preparedQuery); + } else { + Object expectedGeneratedColumns = exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS); + if (expectedGeneratedColumns == null) { + return con.prepareStatement(preparedQuery, Statement.RETURN_GENERATED_KEYS); + } else if (expectedGeneratedColumns instanceof String[]) { + return con.prepareStatement(preparedQuery, (String[]) expectedGeneratedColumns); + } else if (expectedGeneratedColumns instanceof int[]) { + return con.prepareStatement(preparedQuery, (int[]) expectedGeneratedColumns); } else { - Object expectedGeneratedColumns = exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS); - if (expectedGeneratedColumns == null) { - return con.prepareStatement(preparedQuery, Statement.RETURN_GENERATED_KEYS); - } else if (expectedGeneratedColumns instanceof String[]) { - return con.prepareStatement(preparedQuery, (String[]) expectedGeneratedColumns); - } else if (expectedGeneratedColumns instanceof int[]) { - return con.prepareStatement(preparedQuery, (int[]) expectedGeneratedColumns); - } else { - throw new IllegalArgumentException( - "Header specifying expected returning columns isn't an instance of String[] or int[] but " - + expectedGeneratedColumns.getClass()); - } + throw new IllegalArgumentException( + "Header specifying expected returning columns isn't an instance of String[] or int[] but " + + expectedGeneratedColumns.getClass()); } } }; - // special for processing stream list (batch not supported) - SqlOutputType outputType = getEndpoint().getOutputType(); - if (outputType == SqlOutputType.StreamList) { - processStreamList(exchange, statementCreator, sql, preparedQuery); - return; + Object data; + if (getEndpoint().getOutputType() == SqlOutputType.StreamList) { + data = processStreamList(exchange, statementCreator, sql, preparedQuery); + } else { + data = processInternal(exchange, statementCreator, sql, preparedQuery, shouldRetrieveGeneratedKeys); } + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + if (getEndpoint().isNoop() || getEndpoint().getOutputHeader() != null || data == null) { + exchange.getOut().setBody(exchange.getIn().getBody()); + } + if (getEndpoint().getOutputHeader() != null) { + exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); + } else if (data != null && !getEndpoint().isNoop()) { + exchange.getOut().setBody(data); + } + } + private Object processInternal( + Exchange exchange, PreparedStatementCreator statementCreator, + String sql, String preparedQuery, Boolean shouldRetrieveGeneratedKeys) { LOG.trace("jdbcTemplate.execute: {}", preparedQuery); - jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() { - public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException { + return jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Object>() { + public Object doInPreparedStatement(PreparedStatement ps) throws SQLException { + Object data = null; ResultSet rs = null; try { - int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount(); - - // only populate if really needed - if (alwaysPopulateStatement || expected > 0) { - // transfer incoming message body data to prepared statement parameters, if necessary - if (batch) { - Iterator<?> iterator; - if (useMessageBodyForSql) { - iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class); - } else { - iterator = exchange.getIn().getBody(Iterator.class); - } - while (iterator != null && iterator.hasNext()) { - Object value = iterator.next(); - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, - exchange, value); - sqlPrepareStatementStrategy.populateStatement(ps, i, expected); - ps.addBatch(); - } - } else { - Object value; - if (useMessageBodyForSql) { - value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS); - } else { - value = exchange.getIn().getBody(); - } - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, - exchange, value); - sqlPrepareStatementStrategy.populateStatement(ps, i, expected); - } - } - + populateStatement(ps, exchange, sql, preparedQuery); boolean isResultSet = false; - // execute the prepared statement and populate the outgoing message if (batch) { int[] updateCounts = ps.executeBatch(); int total = 0; @@ -184,59 +162,31 @@ public class SqlProducer extends DefaultProducer { } else { isResultSet = ps.execute(); if (isResultSet) { - // preserve headers first, so we can override the SQL_ROW_COUNT header - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); rs = ps.getResultSet(); SqlOutputType outputType = getEndpoint().getOutputType(); LOG.trace("Got result list from query: {}, outputType={}", rs, outputType); + + int rowCount = 0; if (outputType == SqlOutputType.SelectList) { - List<?> data = getEndpoint().queryForList(rs, true); - // for noop=true we still want to enrich with the row count header - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); - } else { - exchange.getOut().setBody(data); - } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size()); + data = getEndpoint().queryForList(rs, true); + rowCount = ((List<?>) data).size(); } else if (outputType == SqlOutputType.SelectOne) { - Object data = getEndpoint().queryForObject(rs); + data = getEndpoint().queryForObject(rs); if (data != null) { - // for noop=true we still want to enrich with the row count header - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); - } else { - exchange.getOut().setBody(data); - } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1); - } else { - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0); + rowCount = 1; } } else { throw new IllegalArgumentException("Invalid outputType=" + outputType); } + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, rowCount); } else { - exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); + exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); + exchange.getOut().setBody(exchange.getIn().getBody()); } } if (shouldRetrieveGeneratedKeys) { - // if no OUT message yet then create one and propagate headers - if (!exchange.hasOut()) { - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); - } - if (isResultSet) { // we won't return generated keys for SELECT statements exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST); @@ -248,8 +198,7 @@ public class SqlProducer extends DefaultProducer { } } - // data is set on exchange so return null - return null; + return data; } finally { closeResultSet(rs); } @@ -257,7 +206,7 @@ public class SqlProducer extends DefaultProducer { }); } - protected void processStreamList( + protected Object processStreamList( Exchange exchange, PreparedStatementCreator statementCreator, String sql, String preparedQuery) throws Exception { LOG.trace("processStreamList: {}", preparedQuery); @@ -271,58 +220,20 @@ public class SqlProducer extends DefaultProducer { try { con = jdbcTemplate.getDataSource().getConnection(); ps = statementCreator.createPreparedStatement(con); + ResultSetIterator iterator = null; - int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount(); - - // only populate if really needed - if (alwaysPopulateStatement || expected > 0) { - // transfer incoming message body data to prepared statement parameters, if necessary - if (batch) { - Iterator<?> iterator; - if (useMessageBodyForSql) { - iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class); - } else { - iterator = exchange.getIn().getBody(Iterator.class); - } - while (iterator != null && iterator.hasNext()) { - Object value = iterator.next(); - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, - exchange, value); - sqlPrepareStatementStrategy.populateStatement(ps, i, expected); - ps.addBatch(); - } - } else { - Object value; - if (useMessageBodyForSql) { - value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS); - } else { - value = exchange.getIn().getBody(); - } - Iterator<?> i - = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); - sqlPrepareStatementStrategy.populateStatement(ps, i, expected); - } - } + populateStatement(ps, exchange, sql, preparedQuery); boolean isResultSet = ps.execute(); if (isResultSet) { rs = ps.getResultSet(); - ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs); - //pass through all headers - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + iterator = getEndpoint().queryForStreamList(con, ps, rs); - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator); - } else { - exchange.getOut().setBody(iterator); - } // we do not know the row count so we cannot set a ROW_COUNT header // defer closing the iterator when the exchange is complete exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); } + return iterator; } catch (Exception e) { // in case of exception then close all this before rethrow closeConnection(con); @@ -332,6 +243,41 @@ public class SqlProducer extends DefaultProducer { } } + private void populateStatement(PreparedStatement ps, Exchange exchange, String sql, String preparedQuery) + throws SQLException { + int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount(); + + // only populate if really needed + if (alwaysPopulateStatement || expected > 0) { + // transfer incoming message body data to prepared statement parameters, if necessary + if (batch) { + Iterator<?> iterator; + if (useMessageBodyForSql) { + iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class); + } else { + iterator = exchange.getIn().getBody(Iterator.class); + } + while (iterator != null && iterator.hasNext()) { + Object value = iterator.next(); + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, + exchange, value); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); + ps.addBatch(); + } + } else { + Object value; + if (useMessageBodyForSql) { + value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS); + } else { + value = exchange.getIn().getBody(); + } + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, + exchange, value); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); + } + } + } + public void setParametersCount(int parametersCount) { this.parametersCount = parametersCount; } diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java index 5e96fd5..1c645a4 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java @@ -33,6 +33,7 @@ import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; public class SqlGeneratedKeysTest extends CamelTestSupport { @@ -79,8 +80,9 @@ public class SqlGeneratedKeysTest extends CamelTestSupport { // first we create our exchange using the endpoint Endpoint endpoint = context.getEndpoint("direct:insert"); + Object body = new Object[] { "project x", "ASF", "new project" }; Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(new Object[] { "project x", "ASF", "new project" }); + exchange.getIn().setBody(body); exchange.getIn().setHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, true); // now we send the exchange to the endpoint, and receives the response from Camel @@ -90,6 +92,7 @@ public class SqlGeneratedKeysTest extends CamelTestSupport { assertNotNull(out); assertNotNull(out.getMessage()); assertNotNull(out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA)); + assertSame(body, out.getMessage().getBody()); List<Map<String, Object>> generatedKeys = out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, List.class); assertNotNull(generatedKeys, "out body could not be converted to a List - was: "