Repository: camel
Updated Branches:
  refs/heads/master 55b0c10db -> b93083465


CAMEL-3907: Add option useMessageForSql to have more dynamic queries. Thansk to 
Zemian Deng for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b9308346
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b9308346
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b9308346

Branch: refs/heads/master
Commit: b93083465de2b4e5326aeba18780068aff56f5bb
Parents: 55b0c10
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Jul 12 10:30:17 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jul 12 10:30:17 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sql/SqlConstants.java       |   5 +
 .../apache/camel/component/sql/SqlEndpoint.java |  17 +-
 .../apache/camel/component/sql/SqlProducer.java |  41 +++--
 .../SqlProducerUseMessageBodyForSqlTest.java    | 171 +++++++++++++++++++
 4 files changed, 222 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java
index 42330c4..2dd7cdc 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConstants.java
@@ -54,6 +54,11 @@ public final class SqlConstants {
      */
     public static final String SQL_GENERATED_KEYS_DATA = 
"CamelSqlGeneratedKeyRows";
 
+    /**
+     * The SQL parameters when using the option useMessageBodyForSql
+     */
+    public static final String SQL_PARAMETERS = "CamelSqlParameters";
+
     private SqlConstants() {
         // Utility class
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 507b071..1086d55 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -86,6 +86,8 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
     private boolean noop;
     @UriParam
     private String outputHeader;
+    @UriParam(label = "producer")
+    private boolean useMessageBodyForSql;
 
     public SqlEndpoint() {
     }
@@ -110,7 +112,7 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
 
     public Producer createProducer() throws Exception {
         SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy 
!= null ? prepareStatementStrategy : new 
DefaultSqlPrepareStatementStrategy(separator);
-        SqlProducer result = new SqlProducer(this, query, jdbcTemplate, 
prepareStrategy, batch, alwaysPopulateStatement);
+        SqlProducer result = new SqlProducer(this, query, jdbcTemplate, 
prepareStrategy, batch, alwaysPopulateStatement, useMessageBodyForSql);
         result.setParametersCount(parametersCount);
         return result;
     }
@@ -318,6 +320,19 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
         this.outputHeader = outputHeader;
     }
 
+    public boolean isUseMessageBodyForSql() {
+        return useMessageBodyForSql;
+    }
+
+    /**
+     * Whether to use the message body as the SQL and then headers for 
parameters.
+     * <p/>
+     * If this option is enabled then the SQL in the uri is not used.
+     */
+    public void setUseMessageBodyForSql(boolean useMessageBodyForSql) {
+        this.useMessageBodyForSql = useMessageBodyForSql;
+    }
+
     public String getDataSourceRef() {
         return dataSourceRef;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index c5eda07..9f2e9d2 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.NoSuchHeaderException;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 import org.springframework.jdbc.core.PreparedStatementCreator;
@@ -35,21 +37,23 @@ import 
org.springframework.jdbc.core.PreparedStatementCreator;
 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
 
 public class SqlProducer extends DefaultProducer {
-    private String query;
-    private JdbcTemplate jdbcTemplate;
-    private boolean batch;
-    private boolean alwaysPopulateStatement;
-    private SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+    private final String query;
+    private final JdbcTemplate jdbcTemplate;
+    private final boolean batch;
+    private final boolean alwaysPopulateStatement;
+    private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+    private final boolean useMessageBodyForSql;
     private int parametersCount;
 
     public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate 
jdbcTemplate, SqlPrepareStatementStrategy sqlPrepareStatementStrategy,
-                       boolean batch, boolean alwaysPopulateStatement) {
+                       boolean batch, boolean alwaysPopulateStatement, boolean 
useMessageBodyForSql) {
         super(endpoint);
         this.jdbcTemplate = jdbcTemplate;
         this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
         this.query = query;
         this.batch = batch;
         this.alwaysPopulateStatement = alwaysPopulateStatement;
+        this.useMessageBodyForSql = useMessageBodyForSql;
     }
 
     @Override
@@ -58,9 +62,13 @@ public class SqlProducer extends DefaultProducer {
     }
 
     public void process(final Exchange exchange) throws Exception {
-        String queryHeader = 
exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
-
-        final String sql = queryHeader != null ? queryHeader : query;
+        final String sql;
+        if (useMessageBodyForSql) {
+            sql = exchange.getIn().getBody(String.class);
+        } else {
+            String queryHeader = 
exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
+            sql = queryHeader != null ? queryHeader : query;
+        }
         final String preparedQuery = 
sqlPrepareStatementStrategy.prepareQuery(sql, 
getEndpoint().isAllowNamedParameters());
 
         // CAMEL-7313 - check whether to return generated keys
@@ -99,7 +107,12 @@ public class SqlProducer extends DefaultProducer {
                     if (alwaysPopulateStatement || expected > 0) {
                         // transfer incoming message body data to prepared 
statement parameters, if necessary
                         if (batch) {
-                            Iterator<?> iterator = 
exchange.getIn().getBody(Iterator.class);
+                            Iterator<?> iterator;
+                            if (useMessageBodyForSql) {
+                                iterator = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
+                            } else {
+                                iterator = 
exchange.getIn().getBody(Iterator.class);
+                            }
                             while (iterator != null && iterator.hasNext()) {
                                 Object value = iterator.next();
                                 Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, 
expected, exchange, value);
@@ -107,7 +120,13 @@ public class SqlProducer extends DefaultProducer {
                                 ps.addBatch();
                             }
                         } else {
-                            Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, 
expected, exchange, exchange.getIn().getBody());
+                            Object value;
+                            if (useMessageBodyForSql) {
+                                value = 
exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
+                            } else {
+                                value = exchange.getIn().getBody();
+                            }
+                            Iterator<?> i = 
sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, 
expected, exchange, value);
                             sqlPrepareStatementStrategy.populateStatement(ps, 
i, expected);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b9308346/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
new file mode 100644
index 0000000..ea8b7d8
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+public class SqlProducerUseMessageBodyForSqlTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+                
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testUseMessageBodyForSqlAndHeaderParams() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                        .setBody(constant("select * from projects where 
license = :?lic order by id"))
+                        .to("sql://query?useMessageBodyForSql=true")
+                        .to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", null, "lic", "ASF");
+
+        List<?> received = assertIsInstanceOf(List.class, 
mock.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("Camel", row.get("PROJECT"));
+
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testUseMessageBodyForSqlAndCamelSqlParameters() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                        .setBody(constant("select * from projects where 
license = :?lic order by id"))
+                        .to("sql://query?useMessageBodyForSql=true")
+                        .to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        Map<String, Object> row = new HashMap<String, Object>();
+        row.put("lic", "ASF");
+        template.sendBodyAndHeader("direct:start", null, 
SqlConstants.SQL_PARAMETERS, row);
+
+        List<?> received = assertIsInstanceOf(List.class, 
mock.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("Camel", row.get("PROJECT"));
+
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testUseMessageBodyForSqlAndCamelSqlParametersBatch() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                        .setBody(constant("insert into projects(id, project, 
license) values(:?id,:?project,:?lic)"))
+                        .to("sql://query?useMessageBodyForSql=true&batch=true")
+                        .to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
+        Map<String, Object> row = new HashMap<String, Object>();
+        row.put("id", 200);
+        row.put("project", "MyProject1");
+        row.put("lic", "OPEN1");
+        rows.add(row);
+        row = new HashMap<String, Object>();
+        row.put("id", 201);
+        row.put("project", "MyProject2");
+        row.put("lic", "OPEN1");
+        rows.add(row);
+        template.sendBodyAndHeader("direct:start", null, 
SqlConstants.SQL_PARAMETERS, rows);
+
+        String origSql = assertIsInstanceOf(String.class, 
mock.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals("insert into projects(id, project, license) 
values(:?id,:?project,:?lic)", origSql);
+
+        assertEquals(null, 
mock.getReceivedExchanges().get(0).getOut().getBody());
+
+        // Clear and then use route2 to verify result of above insert select
+        context.removeRoute(context.getRoutes().get(0).getId());
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
+
+                from("direct:start2")
+                        .setBody(constant("select * from projects where 
license = :?lic order by id"))
+                        .to("sql://query2?useMessageBodyForSql=true")
+                        .to("mock:result2");
+            }
+        });
+
+        mock = getMockEndpoint("mock:result2");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start2", null, "lic", "OPEN1");
+
+        List<?> received = assertIsInstanceOf(List.class, 
mock.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("MyProject1", row.get("PROJECT"));
+
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("MyProject2", row.get("PROJECT"));
+    }
+}

Reply via email to