This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fbc59c  CAMEL-16206: camel-sql: Preserve message body (#5086)
7fbc59c is described below

commit 7fbc59c4abde655cda26c54be6605e4c469a7c6a
Author: Jeremy Ross <jeremy.g.r...@gmail.com>
AuthorDate: Mon Feb 15 15:52:01 2021 -0600

    CAMEL-16206: camel-sql: Preserve message body (#5086)
    
    CAMEL-16206: camel-sql: Preserve message body
    
    when CamelSqlRetrieveGeneratedKeys == true, or when there's nothing else
    useful to put in the body.
---
 .../apache/camel/component/sql/SqlProducer.java    | 226 ++++++++-------------
 .../camel/component/sql/SqlGeneratedKeysTest.java  |   5 +-
 2 files changed, 90 insertions(+), 141 deletions(-)

diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 2a5b36f..0890cce 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -24,7 +24,6 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
@@ -105,75 +104,54 @@ public class SqlProducer extends DefaultProducer {
         final Boolean shouldRetrieveGeneratedKeys
                 = 
exchange.getIn().getHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, false, 
Boolean.class);
 
-        PreparedStatementCreator statementCreator = new 
PreparedStatementCreator() {
-            @Override
-            public PreparedStatement createPreparedStatement(Connection con) 
throws SQLException {
-                if (!shouldRetrieveGeneratedKeys) {
-                    return con.prepareStatement(preparedQuery);
+        PreparedStatementCreator statementCreator = con -> {
+            if (!shouldRetrieveGeneratedKeys) {
+                return con.prepareStatement(preparedQuery);
+            } else {
+                Object expectedGeneratedColumns = 
exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS);
+                if (expectedGeneratedColumns == null) {
+                    return con.prepareStatement(preparedQuery, 
Statement.RETURN_GENERATED_KEYS);
+                } else if (expectedGeneratedColumns instanceof String[]) {
+                    return con.prepareStatement(preparedQuery, (String[]) 
expectedGeneratedColumns);
+                } else if (expectedGeneratedColumns instanceof int[]) {
+                    return con.prepareStatement(preparedQuery, (int[]) 
expectedGeneratedColumns);
                 } else {
-                    Object expectedGeneratedColumns = 
exchange.getIn().getHeader(SqlConstants.SQL_GENERATED_COLUMNS);
-                    if (expectedGeneratedColumns == null) {
-                        return con.prepareStatement(preparedQuery, 
Statement.RETURN_GENERATED_KEYS);
-                    } else if (expectedGeneratedColumns instanceof String[]) {
-                        return con.prepareStatement(preparedQuery, (String[]) 
expectedGeneratedColumns);
-                    } else if (expectedGeneratedColumns instanceof int[]) {
-                        return con.prepareStatement(preparedQuery, (int[]) 
expectedGeneratedColumns);
-                    } else {
-                        throw new IllegalArgumentException(
-                                "Header specifying expected returning columns 
isn't an instance of String[] or int[] but "
-                                                           + 
expectedGeneratedColumns.getClass());
-                    }
+                    throw new IllegalArgumentException(
+                            "Header specifying expected returning columns 
isn't an instance of String[] or int[] but "
+                                                       + 
expectedGeneratedColumns.getClass());
                 }
             }
         };
 
-        // special for processing stream list (batch not supported)
-        SqlOutputType outputType = getEndpoint().getOutputType();
-        if (outputType == SqlOutputType.StreamList) {
-            processStreamList(exchange, statementCreator, sql, preparedQuery);
-            return;
+        Object data;
+        if (getEndpoint().getOutputType() == SqlOutputType.StreamList) {
+            data = processStreamList(exchange, statementCreator, sql, 
preparedQuery);
+        } else {
+            data = processInternal(exchange, statementCreator, sql, 
preparedQuery, shouldRetrieveGeneratedKeys);
         }
+        exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+        if (getEndpoint().isNoop() || getEndpoint().getOutputHeader() != null 
|| data == null) {
+            exchange.getOut().setBody(exchange.getIn().getBody());
+        }
+        if (getEndpoint().getOutputHeader() != null) {
+            exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
+        } else if (data != null && !getEndpoint().isNoop()) {
+            exchange.getOut().setBody(data);
+        }
+    }
 
+    private Object processInternal(
+            Exchange exchange, PreparedStatementCreator statementCreator,
+            String sql, String preparedQuery, Boolean 
shouldRetrieveGeneratedKeys) {
         LOG.trace("jdbcTemplate.execute: {}", preparedQuery);
-        jdbcTemplate.execute(statementCreator, new 
PreparedStatementCallback<Map<?, ?>>() {
-            public Map<?, ?> doInPreparedStatement(PreparedStatement ps) 
throws SQLException {
+        return jdbcTemplate.execute(statementCreator, new 
PreparedStatementCallback<Object>() {
+            public Object doInPreparedStatement(PreparedStatement ps) throws 
SQLException {
+                Object data = null;
                 ResultSet rs = null;
                 try {
-                    int expected = parametersCount > 0 ? parametersCount : 
ps.getParameterMetaData().getParameterCount();
-
-                    // only populate if really needed
-                    if (alwaysPopulateStatement || expected > 0) {
-                        // transfer incoming message body data to prepared 
statement parameters, if necessary
-                        if (batch) {
-                            Iterator<?> iterator;
-                            if (useMessageBodyForSql) {
-                                iterator = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
-                            } else {
-                                iterator = 
exchange.getIn().getBody(Iterator.class);
-                            }
-                            while (iterator != null && iterator.hasNext()) {
-                                Object value = iterator.next();
-                                Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
-                                        exchange, value);
-                                
sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
-                                ps.addBatch();
-                            }
-                        } else {
-                            Object value;
-                            if (useMessageBodyForSql) {
-                                value = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
-                            } else {
-                                value = exchange.getIn().getBody();
-                            }
-                            Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
-                                    exchange, value);
-                            sqlPrepareStatementStrategy.populateStatement(ps, 
i, expected);
-                        }
-                    }
-
+                    populateStatement(ps, exchange, sql, preparedQuery);
                     boolean isResultSet = false;
 
-                    // execute the prepared statement and populate the 
outgoing message
                     if (batch) {
                         int[] updateCounts = ps.executeBatch();
                         int total = 0;
@@ -184,59 +162,31 @@ public class SqlProducer extends DefaultProducer {
                     } else {
                         isResultSet = ps.execute();
                         if (isResultSet) {
-                            // preserve headers first, so we can override the 
SQL_ROW_COUNT header
-                            
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
 
                             rs = ps.getResultSet();
                             SqlOutputType outputType = 
getEndpoint().getOutputType();
                             LOG.trace("Got result list from query: {}, 
outputType={}", rs, outputType);
+
+                            int rowCount = 0;
                             if (outputType == SqlOutputType.SelectList) {
-                                List<?> data = getEndpoint().queryForList(rs, 
true);
-                                // for noop=true we still want to enrich with 
the row count header
-                                if (getEndpoint().isNoop()) {
-                                    
exchange.getOut().setBody(exchange.getIn().getBody());
-                                } else if (getEndpoint().getOutputHeader() != 
null) {
-                                    
exchange.getOut().setBody(exchange.getIn().getBody());
-                                    
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
-                                } else {
-                                    exchange.getOut().setBody(data);
-                                }
-                                
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
+                                data = getEndpoint().queryForList(rs, true);
+                                rowCount = ((List<?>) data).size();
                             } else if (outputType == SqlOutputType.SelectOne) {
-                                Object data = getEndpoint().queryForObject(rs);
+                                data = getEndpoint().queryForObject(rs);
                                 if (data != null) {
-                                    // for noop=true we still want to enrich 
with the row count header
-                                    if (getEndpoint().isNoop()) {
-                                        
exchange.getOut().setBody(exchange.getIn().getBody());
-                                    } else if (getEndpoint().getOutputHeader() 
!= null) {
-                                        
exchange.getOut().setBody(exchange.getIn().getBody());
-                                        
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
-                                    } else {
-                                        exchange.getOut().setBody(data);
-                                    }
-                                    
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
-                                } else {
-                                    if (getEndpoint().isNoop()) {
-                                        
exchange.getOut().setBody(exchange.getIn().getBody());
-                                    } else if (getEndpoint().getOutputHeader() 
!= null) {
-                                        
exchange.getOut().setBody(exchange.getIn().getBody());
-                                    }
-                                    
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 0);
+                                    rowCount = 1;
                                 }
                             } else {
                                 throw new IllegalArgumentException("Invalid 
outputType=" + outputType);
                             }
+                            
exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, rowCount);
                         } else {
-                            
exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+                            
exchange.getOut().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+                            
exchange.getOut().setBody(exchange.getIn().getBody());
                         }
                     }
 
                     if (shouldRetrieveGeneratedKeys) {
-                        // if no OUT message yet then create one and propagate 
headers
-                        if (!exchange.hasOut()) {
-                            
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-                        }
-
                         if (isResultSet) {
                             // we won't return generated keys for SELECT 
statements
                             
exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, 
Collections.EMPTY_LIST);
@@ -248,8 +198,7 @@ public class SqlProducer extends DefaultProducer {
                         }
                     }
 
-                    // data is set on exchange so return null
-                    return null;
+                    return data;
                 } finally {
                     closeResultSet(rs);
                 }
@@ -257,7 +206,7 @@ public class SqlProducer extends DefaultProducer {
         });
     }
 
-    protected void processStreamList(
+    protected Object processStreamList(
             Exchange exchange, PreparedStatementCreator statementCreator, 
String sql, String preparedQuery)
             throws Exception {
         LOG.trace("processStreamList: {}", preparedQuery);
@@ -271,58 +220,20 @@ public class SqlProducer extends DefaultProducer {
         try {
             con = jdbcTemplate.getDataSource().getConnection();
             ps = statementCreator.createPreparedStatement(con);
+            ResultSetIterator iterator = null;
 
-            int expected = parametersCount > 0 ? parametersCount : 
ps.getParameterMetaData().getParameterCount();
-
-            // only populate if really needed
-            if (alwaysPopulateStatement || expected > 0) {
-                // transfer incoming message body data to prepared statement 
parameters, if necessary
-                if (batch) {
-                    Iterator<?> iterator;
-                    if (useMessageBodyForSql) {
-                        iterator = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
-                    } else {
-                        iterator = exchange.getIn().getBody(Iterator.class);
-                    }
-                    while (iterator != null && iterator.hasNext()) {
-                        Object value = iterator.next();
-                        Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
-                                exchange, value);
-                        sqlPrepareStatementStrategy.populateStatement(ps, i, 
expected);
-                        ps.addBatch();
-                    }
-                } else {
-                    Object value;
-                    if (useMessageBodyForSql) {
-                        value = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
-                    } else {
-                        value = exchange.getIn().getBody();
-                    }
-                    Iterator<?> i
-                            = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, 
expected, exchange, value);
-                    sqlPrepareStatementStrategy.populateStatement(ps, i, 
expected);
-                }
-            }
+            populateStatement(ps, exchange, sql, preparedQuery);
 
             boolean isResultSet = ps.execute();
             if (isResultSet) {
                 rs = ps.getResultSet();
-                ResultSetIterator iterator = 
getEndpoint().queryForStreamList(con, ps, rs);
-                //pass through all headers
-                
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+                iterator = getEndpoint().queryForStreamList(con, ps, rs);
 
-                if (getEndpoint().isNoop()) {
-                    exchange.getOut().setBody(exchange.getIn().getBody());
-                } else if (getEndpoint().getOutputHeader() != null) {
-                    exchange.getOut().setBody(exchange.getIn().getBody());
-                    
exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
-                } else {
-                    exchange.getOut().setBody(iterator);
-                }
                 // we do not know the row count so we cannot set a ROW_COUNT 
header
                 // defer closing the iterator when the exchange is complete
                 exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
             }
+            return iterator;
         } catch (Exception e) {
             // in case of exception then close all this before rethrow
             closeConnection(con);
@@ -332,6 +243,41 @@ public class SqlProducer extends DefaultProducer {
         }
     }
 
+    private void populateStatement(PreparedStatement ps, Exchange exchange, 
String sql, String preparedQuery)
+            throws SQLException {
+        int expected = parametersCount > 0 ? parametersCount : 
ps.getParameterMetaData().getParameterCount();
+
+        // only populate if really needed
+        if (alwaysPopulateStatement || expected > 0) {
+            // transfer incoming message body data to prepared statement 
parameters, if necessary
+            if (batch) {
+                Iterator<?> iterator;
+                if (useMessageBodyForSql) {
+                    iterator = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
+                } else {
+                    iterator = exchange.getIn().getBody(Iterator.class);
+                }
+                while (iterator != null && iterator.hasNext()) {
+                    Object value = iterator.next();
+                    Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
+                            exchange, value);
+                    sqlPrepareStatementStrategy.populateStatement(ps, i, 
expected);
+                    ps.addBatch();
+                }
+            } else {
+                Object value;
+                if (useMessageBodyForSql) {
+                    value = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
+                } else {
+                    value = exchange.getIn().getBody();
+                }
+                Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected,
+                        exchange, value);
+                sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+            }
+        }
+    }
+
     public void setParametersCount(int parametersCount) {
         this.parametersCount = parametersCount;
     }
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
index 5e96fd5..1c645a4 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
@@ -33,6 +33,7 @@ import 
org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SqlGeneratedKeysTest extends CamelTestSupport {
@@ -79,8 +80,9 @@ public class SqlGeneratedKeysTest extends CamelTestSupport {
         // first we create our exchange using the endpoint
         Endpoint endpoint = context.getEndpoint("direct:insert");
 
+        Object body = new Object[] { "project x", "ASF", "new project" };
         Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setBody(new Object[] { "project x", "ASF", "new 
project" });
+        exchange.getIn().setBody(body);
         exchange.getIn().setHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, 
true);
 
         // now we send the exchange to the endpoint, and receives the response 
from Camel
@@ -90,6 +92,7 @@ public class SqlGeneratedKeysTest extends CamelTestSupport {
         assertNotNull(out);
         assertNotNull(out.getMessage());
         
assertNotNull(out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA));
+        assertSame(body, out.getMessage().getBody());
 
         List<Map<String, Object>> generatedKeys = 
out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, List.class);
         assertNotNull(generatedKeys, "out body could not be converted to a 
List - was: "

Reply via email to