CAMEL-9162: camel-elsql component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b71c42f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b71c42f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b71c42f Branch: refs/heads/master Commit: 0b71c42f2c2e1704fc61ca2bcd7385ef0ff84677 Parents: f6cefc7 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Oct 5 08:21:20 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Oct 5 10:54:54 2015 +0200 ---------------------------------------------------------------------- .../camel/component/elsql/ElsqlConsumer.java | 4 +- .../camel/component/elsql/ElsqlEndpoint.java | 6 +- .../camel/component/elsql/ElsqlProducer.java | 3 +- .../camel/component/sql/DefaultSqlEndpoint.java | 437 +++++++++++++++++++ .../apache/camel/component/sql/SqlConsumer.java | 2 +- .../apache/camel/component/sql/SqlEndpoint.java | 428 +----------------- 6 files changed, 456 insertions(+), 424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java index 0f1c6d7..9459241 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java @@ -17,15 +17,15 @@ package org.apache.camel.component.elsql; import org.apache.camel.Processor; +import org.apache.camel.component.sql.DefaultSqlEndpoint; import org.apache.camel.component.sql.SqlConsumer; -import org.apache.camel.component.sql.SqlEndpoint; import org.apache.camel.component.sql.SqlPrepareStatementStrategy; import org.apache.camel.component.sql.SqlProcessingStrategy; import org.springframework.jdbc.core.JdbcTemplate; public class ElsqlConsumer extends SqlConsumer { - public ElsqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, + public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { super(endpoint, processor, jdbcTemplate, query, sqlPrepareStatementStrategy, sqlProcessingStrategy); } http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java index 352dc37..a07b93e 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java @@ -24,7 +24,7 @@ import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.sql.SqlEndpoint; +import org.apache.camel.component.sql.DefaultSqlEndpoint; import org.apache.camel.component.sql.SqlPrepareStatementStrategy; import org.apache.camel.component.sql.SqlProcessingStrategy; import org.apache.camel.spi.Metadata; @@ -37,7 +37,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @UriEndpoint(scheme = "elsql", title = "SQL", syntax = "elsql:elsqlName:resourceUri", consumerClass = ElsqlConsumer.class, label = "database,sql") -public class ElsqlEndpoint extends SqlEndpoint { +public class ElsqlEndpoint extends DefaultSqlEndpoint { private volatile ElSql elSql; private NamedParameterJdbcTemplate namedJdbcTemplate; @@ -51,7 +51,7 @@ public class ElsqlEndpoint extends SqlEndpoint { private ElSqlConfig elSqlConfig; public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, String elsqlName, String resourceUri) { - super(uri, component, null, null); + super(uri, component, null); this.elsqlName = elsqlName; this.resourceUri = resourceUri; this.namedJdbcTemplate = namedJdbcTemplate; http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java index fa3f5df..4353d9a 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java @@ -25,7 +25,6 @@ import com.opengamma.elsql.ElSql; import com.opengamma.elsql.SpringSqlParams; import org.apache.camel.Exchange; import org.apache.camel.component.sql.SqlConstants; -import org.apache.camel.component.sql.SqlEndpoint; import org.apache.camel.component.sql.SqlOutputType; import org.apache.camel.impl.DefaultProducer; import org.springframework.dao.DataAccessException; @@ -41,7 +40,7 @@ public class ElsqlProducer extends DefaultProducer { private final String elSqlName; private final NamedParameterJdbcTemplate jdbcTemplate; - public ElsqlProducer(SqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) { + public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) { super(endpoint); this.elSql = elSql; this.elSqlName = elSqlName; http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java new file mode 100644 index 0000000..c67ecc1 --- /dev/null +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import java.sql.ResultSet; +import java.sql.SQLDataException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; + +import org.apache.camel.Component; +import org.apache.camel.impl.DefaultPollingEndpoint; +import org.apache.camel.spi.UriParam; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.jdbc.core.ColumnMapRowMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.RowMapperResultSetExtractor; + +/** + * Base class for SQL endpoints. + */ +public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint { + private JdbcTemplate jdbcTemplate; + + @UriParam(description = "Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.") + @Deprecated + private String dataSourceRef; + @UriParam(description = "Sets the DataSource to use to communicate with the database.") + private DataSource dataSource; + @UriParam(label = "producer", description = "Enables or disables batch mode") + private boolean batch; + @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll") + private int maxMessagesPerPoll; + @UriParam(label = "consumer,advanced", + description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.") + private SqlProcessingStrategy processingStrategy; + @UriParam(label = "advanced", + description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.") + private SqlPrepareStatementStrategy prepareStatementStrategy; + @UriParam(label = "consumer", + description = "After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.") + private String onConsume; + @UriParam(label = "consumer", + description = "After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.") + private String onConsumeFailed; + @UriParam(label = "consumer", + description = "After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.") + private String onConsumeBatchComplete; + @UriParam(label = "consumer", defaultValue = "true", + description = "Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.") + private boolean useIterator = true; + @UriParam(label = "consumer", + description = "Sets whether empty resultset should be allowed to be sent to the next hop. Defaults to false. So the empty resultset will be filtered out.") + private boolean routeEmptyResultSet; + @UriParam(label = "consumer", defaultValue = "-1", description = "Sets an expected update count to validate when using onConsume.") + private int expectedUpdateCount = -1; + @UriParam(label = "consumer", description = "Sets whether to break batch if onConsume failed.") + private boolean breakBatchOnConsumeFail; + @UriParam(defaultValue = "true", description = "Whether to allow using named parameters in the queries.") + private boolean allowNamedParameters = true; + @UriParam(label = "producer,advanced", + description = "If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, " + + "also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1" + + " or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.") + private boolean alwaysPopulateStatement; + @UriParam(defaultValue = ",", + description = "The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders." + + "Notice if you use named parameters, then a Map type is used instead. The default value is ,") + private char separator = ','; + @UriParam(defaultValue = "SelectList", description = "Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:" + + "a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object." + + "b) If the query has more than one column, then it will return a Map of that result." + + "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names." + + "It will assume your class has a default constructor to create instance with." + + "d) If the query resulted in more than one rows, it throws an non-unique result exception.") + private SqlOutputType outputType = SqlOutputType.SelectList; + @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.") + private String outputClass; + @UriParam(label = "producer,advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of" + + " querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.") + private int parametersCount; + @UriParam(label = "producer", description = "If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing") + private boolean noop; + @UriParam(description = "Store the query result in a header instead of the message body. By default, outputHeader == null and the query result is stored" + + " in the message body, any existing content in the message body is discarded. If outputHeader is set, the value is used as the name of the header" + + " to store the query result and the original message body is preserved.") + private String outputHeader; + @UriParam(label = "producer", description = "Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used.") + private boolean useMessageBodyForSql; + + public DefaultSqlEndpoint() { + } + + public DefaultSqlEndpoint(String uri, Component component, JdbcTemplate jdbcTemplate) { + super(uri, component); + this.jdbcTemplate = jdbcTemplate; + } + + public boolean isSingleton() { + return true; + } + + public JdbcTemplate getJdbcTemplate() { + return jdbcTemplate; + } + + public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + public boolean isBatch() { + return batch; + } + + /** + * Enables or disables batch mode + */ + public void setBatch(boolean batch) { + this.batch = batch; + } + + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + /** + * Sets the maximum number of messages to poll + */ + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public SqlProcessingStrategy getProcessingStrategy() { + return processingStrategy; + } + + /** + * Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch. + */ + public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) { + this.processingStrategy = processingStrategy; + } + + public SqlPrepareStatementStrategy getPrepareStatementStrategy() { + return prepareStatementStrategy; + } + + /** + * Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement. + */ + public void setPrepareStatementStrategy(SqlPrepareStatementStrategy prepareStatementStrategy) { + this.prepareStatementStrategy = prepareStatementStrategy; + } + + public String getOnConsume() { + return onConsume; + } + + /** + * After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter. + */ + public void setOnConsume(String onConsume) { + this.onConsume = onConsume; + } + + public String getOnConsumeFailed() { + return onConsumeFailed; + } + + /** + * After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter. + */ + public void setOnConsumeFailed(String onConsumeFailed) { + this.onConsumeFailed = onConsumeFailed; + } + + public String getOnConsumeBatchComplete() { + return onConsumeBatchComplete; + } + + /** + * After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters. + */ + public void setOnConsumeBatchComplete(String onConsumeBatchComplete) { + this.onConsumeBatchComplete = onConsumeBatchComplete; + } + + public boolean isAllowNamedParameters() { + return allowNamedParameters; + } + + /** + * Whether to allow using named parameters in the queries. + */ + public void setAllowNamedParameters(boolean allowNamedParameters) { + this.allowNamedParameters = allowNamedParameters; + } + + public boolean isAlwaysPopulateStatement() { + return alwaysPopulateStatement; + } + + /** + * If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, + * also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there + * is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. + */ + public void setAlwaysPopulateStatement(boolean alwaysPopulateStatement) { + this.alwaysPopulateStatement = alwaysPopulateStatement; + } + + public char getSeparator() { + return separator; + } + + /** + * The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders. + * Notice if you use named parameters, then a Map type is used instead. + * <p/> + * The default value is , + */ + public void setSeparator(char separator) { + this.separator = separator; + } + + public SqlOutputType getOutputType() { + return outputType; + } + + /** + * Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way: + * a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object. + * b) If the query has more than one column, then it will return a Map of that result. + * c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names. + * It will assume your class has a default constructor to create instance with. + * d) If the query resulted in more than one rows, it throws an non-unique result exception. + */ + public void setOutputType(SqlOutputType outputType) { + this.outputType = outputType; + } + + public String getOutputClass() { + return outputClass; + } + + /** + * Specify the full package and class name to use as conversion when outputType=SelectOne. + */ + public void setOutputClass(String outputClass) { + this.outputClass = outputClass; + } + + public int getParametersCount() { + return parametersCount; + } + + /** + * If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. + * This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. + */ + public void setParametersCount(int parametersCount) { + this.parametersCount = parametersCount; + } + + public boolean isNoop() { + return noop; + } + + /** + * If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing + */ + public void setNoop(boolean noop) { + this.noop = noop; + } + + public String getOutputHeader() { + return outputHeader; + } + + /** + * Store the query result in a header instead of the message body. + * By default, outputHeader == null and the query result is stored in the message body, + * any existing content in the message body is discarded. + * If outputHeader is set, the value is used as the name of the header to store the + * query result and the original message body is preserved. + */ + public void setOutputHeader(String outputHeader) { + this.outputHeader = outputHeader; + } + + public boolean isUseMessageBodyForSql() { + return useMessageBodyForSql; + } + + /** + * Whether to use the message body as the SQL and then headers for parameters. + * <p/> + * If this option is enabled then the SQL in the uri is not used. + */ + public void setUseMessageBodyForSql(boolean useMessageBodyForSql) { + this.useMessageBodyForSql = useMessageBodyForSql; + } + + public String getDataSourceRef() { + return dataSourceRef; + } + + /** + * Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database. + */ + public void setDataSourceRef(String dataSourceRef) { + this.dataSourceRef = dataSourceRef; + } + + public DataSource getDataSource() { + return dataSource; + } + + /** + * Sets the DataSource to use to communicate with the database. + */ + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + + public boolean isUseIterator() { + return useIterator; + } + + /** + * Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true. + */ + public void setUseIterator(boolean useIterator) { + this.useIterator = useIterator; + } + + public boolean isRouteEmptyResultSet() { + return routeEmptyResultSet; + } + + /** + * Sets whether empty resultset should be allowed to be sent to the next hop. + * Defaults to false. So the empty resultset will be filtered out. + */ + public void setRouteEmptyResultSet(boolean routeEmptyResultSet) { + this.routeEmptyResultSet = routeEmptyResultSet; + } + + public int getExpectedUpdateCount() { + return expectedUpdateCount; + } + + /** + * Sets an expected update count to validate when using onConsume. + */ + public void setExpectedUpdateCount(int expectedUpdateCount) { + this.expectedUpdateCount = expectedUpdateCount; + } + + public boolean isBreakBatchOnConsumeFail() { + return breakBatchOnConsumeFail; + } + + /** + * Sets whether to break batch if onConsume failed. + */ + public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) { + this.breakBatchOnConsumeFail = breakBatchOnConsumeFail; + } + + @SuppressWarnings("unchecked") + public List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException { + if (allowMapToClass && outputClass != null) { + Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass); + RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz); + RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper); + List<?> data = mapper.extractData(rs); + return data; + } else { + ColumnMapRowMapper rowMapper = new ColumnMapRowMapper(); + RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper); + List<Map<String, Object>> data = mapper.extractData(rs); + return data; + } + } + + @SuppressWarnings("unchecked") + public Object queryForObject(ResultSet rs) throws SQLException { + Object result = null; + if (outputClass == null) { + RowMapper rowMapper = new ColumnMapRowMapper(); + RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper); + List<Map<String, Object>> data = mapper.extractData(rs); + if (data.size() > 1) { + throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() + " count instead."); + } else if (data.size() == 1) { + // Set content depend on number of column from query result + Map<String, Object> row = data.get(0); + if (row.size() == 1) { + result = row.values().iterator().next(); + } else { + result = row; + } + } + } else { + Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass); + RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz); + RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper); + List<?> data = mapper.extractData(rs); + if (data.size() > 1) { + throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() + " count instead."); + } else if (data.size() == 1) { + result = data.get(0); + } + } + + // If data.size is zero, let result be null. + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index 29c3e07..3774dc2 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -59,7 +59,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { } } - public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, + public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { super(endpoint, processor); this.jdbcTemplate = jdbcTemplate; http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java index 761c125..fdf049c 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java @@ -16,29 +16,15 @@ */ package org.apache.camel.component.sql; -import java.sql.ResultSet; -import java.sql.SQLDataException; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; - -import javax.sql.DataSource; - import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultPollingEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.UnsafeUriCharactersEncoder; -import org.springframework.jdbc.core.BeanPropertyRowMapper; -import org.springframework.jdbc.core.ColumnMapRowMapper; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.jdbc.core.RowMapperResultSetExtractor; /** * SQL Endpoint. Endpoint URI should contain valid SQL statement, but instead of @@ -46,91 +32,23 @@ import org.springframework.jdbc.core.RowMapperResultSetExtractor; * This is because in camel question mark has other meaning. */ @UriEndpoint(scheme = "sql", title = "SQL", syntax = "sql:query", consumerClass = SqlConsumer.class, label = "database,sql") -public class SqlEndpoint extends DefaultPollingEndpoint { - private JdbcTemplate jdbcTemplate; +public class SqlEndpoint extends DefaultSqlEndpoint { @UriPath(description = "Sets the SQL query to perform") @Metadata(required = "true") - private String query; - @UriParam(description = "Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.") - @Deprecated - private String dataSourceRef; - @UriParam(description = "Sets the DataSource to use to communicate with the database.") - private DataSource dataSource; - @UriParam(label = "producer", description = "Enables or disables batch mode") - private boolean batch; - @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll") - private int maxMessagesPerPoll; - @UriParam(label = "consumer,advanced", - description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.") - private SqlProcessingStrategy processingStrategy; - @UriParam(label = "advanced", - description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.") - private SqlPrepareStatementStrategy prepareStatementStrategy; - @UriParam(label = "consumer", - description = "After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.") - private String onConsume; - @UriParam(label = "consumer", - description = "After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.") - private String onConsumeFailed; - @UriParam(label = "consumer", - description = "After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.") - private String onConsumeBatchComplete; - @UriParam(label = "consumer", defaultValue = "true", - description = "Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.") - private boolean useIterator = true; - @UriParam(label = "consumer", - description = "Sets whether empty resultset should be allowed to be sent to the next hop. Defaults to false. So the empty resultset will be filtered out.") - private boolean routeEmptyResultSet; - @UriParam(label = "consumer", defaultValue = "-1", description = "Sets an expected update count to validate when using onConsume.") - private int expectedUpdateCount = -1; - @UriParam(label = "consumer", description = "Sets whether to break batch if onConsume failed.") - private boolean breakBatchOnConsumeFail; - @UriParam(defaultValue = "true", description = "Whether to allow using named parameters in the queries.") - private boolean allowNamedParameters = true; - @UriParam(label = "producer,advanced", - description = "If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, " - + "also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1" - + " or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.") - private boolean alwaysPopulateStatement; - @UriParam(defaultValue = ",", - description = "The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders." - + "Notice if you use named parameters, then a Map type is used instead. The default value is ,") - private char separator = ','; - @UriParam(defaultValue = "SelectList", description = "Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:" - + "a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object." - + "b) If the query has more than one column, then it will return a Map of that result." - + "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names." - + "It will assume your class has a default constructor to create instance with." - + "d) If the query resulted in more than one rows, it throws an non-unique result exception.") - private SqlOutputType outputType = SqlOutputType.SelectList; - @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.") - private String outputClass; - @UriParam(label = "producer,advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of" - + " querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.") - private int parametersCount; - @UriParam(label = "producer", description = "If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing") - private boolean noop; - @UriParam(description = "Store the query result in a header instead of the message body. By default, outputHeader == null and the query result is stored" - + " in the message body, any existing content in the message body is discarded. If outputHeader is set, the value is used as the name of the header" - + " to store the query result and the original message body is preserved.") - private String outputHeader; - @UriParam(label = "producer", description = "Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used.") - private boolean useMessageBodyForSql; public SqlEndpoint() { } public SqlEndpoint(String uri, Component component, JdbcTemplate jdbcTemplate, String query) { - super(uri, component); - this.jdbcTemplate = jdbcTemplate; + super(uri, component, jdbcTemplate); this.query = query; } public Consumer createConsumer(Processor processor) throws Exception { - SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator); - SqlProcessingStrategy proStrategy = processingStrategy != null ? processingStrategy : new DefaultSqlProcessingStrategy(prepareStrategy); - SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query, prepareStrategy, proStrategy); + SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator()); + SqlProcessingStrategy proStrategy = getProcessingStrategy() != null ? getProcessingStrategy() : new DefaultSqlProcessingStrategy(prepareStrategy); + SqlConsumer consumer = new SqlConsumer(this, processor, getJdbcTemplate(), query, prepareStrategy, proStrategy); consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); consumer.setOnConsume(getOnConsume()); consumer.setOnConsumeFailed(getOnConsumeFailed()); @@ -144,22 +62,16 @@ public class SqlEndpoint extends DefaultPollingEndpoint { } public Producer createProducer() throws Exception { - SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator); - SqlProducer result = new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement, useMessageBodyForSql); - result.setParametersCount(parametersCount); + SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator()); + SqlProducer result = new SqlProducer(this, query, getJdbcTemplate(), prepareStrategy, isBatch(), isAlwaysPopulateStatement(), isUseMessageBodyForSql()); + result.setParametersCount(getParametersCount()); return result; } - public boolean isSingleton() { - return true; - } - - public JdbcTemplate getJdbcTemplate() { - return jdbcTemplate; - } - - public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; + @Override + protected String createEndpointUri() { + // Make sure it's properly encoded + return "sql:" + UnsafeUriCharactersEncoder.encode(query); } public String getQuery() { @@ -173,320 +85,4 @@ public class SqlEndpoint extends DefaultPollingEndpoint { this.query = query; } - public boolean isBatch() { - return batch; - } - - /** - * Enables or disables batch mode - */ - public void setBatch(boolean batch) { - this.batch = batch; - } - - public int getMaxMessagesPerPoll() { - return maxMessagesPerPoll; - } - - /** - * Sets the maximum number of messages to poll - */ - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - - public SqlProcessingStrategy getProcessingStrategy() { - return processingStrategy; - } - - /** - * Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch. - */ - public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) { - this.processingStrategy = processingStrategy; - } - - public SqlPrepareStatementStrategy getPrepareStatementStrategy() { - return prepareStatementStrategy; - } - - /** - * Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement. - */ - public void setPrepareStatementStrategy(SqlPrepareStatementStrategy prepareStatementStrategy) { - this.prepareStatementStrategy = prepareStatementStrategy; - } - - public String getOnConsume() { - return onConsume; - } - - /** - * After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter. - */ - public void setOnConsume(String onConsume) { - this.onConsume = onConsume; - } - - public String getOnConsumeFailed() { - return onConsumeFailed; - } - - /** - * After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter. - */ - public void setOnConsumeFailed(String onConsumeFailed) { - this.onConsumeFailed = onConsumeFailed; - } - - public String getOnConsumeBatchComplete() { - return onConsumeBatchComplete; - } - - /** - * After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters. - */ - public void setOnConsumeBatchComplete(String onConsumeBatchComplete) { - this.onConsumeBatchComplete = onConsumeBatchComplete; - } - - public boolean isAllowNamedParameters() { - return allowNamedParameters; - } - - /** - * Whether to allow using named parameters in the queries. - */ - public void setAllowNamedParameters(boolean allowNamedParameters) { - this.allowNamedParameters = allowNamedParameters; - } - - public boolean isAlwaysPopulateStatement() { - return alwaysPopulateStatement; - } - - /** - * If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, - * also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there - * is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. - */ - public void setAlwaysPopulateStatement(boolean alwaysPopulateStatement) { - this.alwaysPopulateStatement = alwaysPopulateStatement; - } - - public char getSeparator() { - return separator; - } - - /** - * The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders. - * Notice if you use named parameters, then a Map type is used instead. - * <p/> - * The default value is , - */ - public void setSeparator(char separator) { - this.separator = separator; - } - - public SqlOutputType getOutputType() { - return outputType; - } - - /** - * Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way: - * a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object. - * b) If the query has more than one column, then it will return a Map of that result. - * c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names. - * It will assume your class has a default constructor to create instance with. - * d) If the query resulted in more than one rows, it throws an non-unique result exception. - */ - public void setOutputType(SqlOutputType outputType) { - this.outputType = outputType; - } - - public String getOutputClass() { - return outputClass; - } - - /** - * Specify the full package and class name to use as conversion when outputType=SelectOne. - */ - public void setOutputClass(String outputClass) { - this.outputClass = outputClass; - } - - public int getParametersCount() { - return parametersCount; - } - - /** - * If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. - * This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. - */ - public void setParametersCount(int parametersCount) { - this.parametersCount = parametersCount; - } - - public boolean isNoop() { - return noop; - } - - /** - * If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing - */ - public void setNoop(boolean noop) { - this.noop = noop; - } - - public String getOutputHeader() { - return outputHeader; - } - - /** - * Store the query result in a header instead of the message body. - * By default, outputHeader == null and the query result is stored in the message body, - * any existing content in the message body is discarded. - * If outputHeader is set, the value is used as the name of the header to store the - * query result and the original message body is preserved. - */ - public void setOutputHeader(String outputHeader) { - this.outputHeader = outputHeader; - } - - public boolean isUseMessageBodyForSql() { - return useMessageBodyForSql; - } - - /** - * Whether to use the message body as the SQL and then headers for parameters. - * <p/> - * If this option is enabled then the SQL in the uri is not used. - */ - public void setUseMessageBodyForSql(boolean useMessageBodyForSql) { - this.useMessageBodyForSql = useMessageBodyForSql; - } - - public String getDataSourceRef() { - return dataSourceRef; - } - - /** - * Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database. - */ - public void setDataSourceRef(String dataSourceRef) { - this.dataSourceRef = dataSourceRef; - } - - public DataSource getDataSource() { - return dataSource; - } - - /** - * Sets the DataSource to use to communicate with the database. - */ - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; - } - - public boolean isUseIterator() { - return useIterator; - } - - /** - * Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true. - */ - public void setUseIterator(boolean useIterator) { - this.useIterator = useIterator; - } - - public boolean isRouteEmptyResultSet() { - return routeEmptyResultSet; - } - - /** - * Sets whether empty resultset should be allowed to be sent to the next hop. - * Defaults to false. So the empty resultset will be filtered out. - */ - public void setRouteEmptyResultSet(boolean routeEmptyResultSet) { - this.routeEmptyResultSet = routeEmptyResultSet; - } - - public int getExpectedUpdateCount() { - return expectedUpdateCount; - } - - /** - * Sets an expected update count to validate when using onConsume. - */ - public void setExpectedUpdateCount(int expectedUpdateCount) { - this.expectedUpdateCount = expectedUpdateCount; - } - - public boolean isBreakBatchOnConsumeFail() { - return breakBatchOnConsumeFail; - } - - /** - * Sets whether to break batch if onConsume failed. - */ - public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) { - this.breakBatchOnConsumeFail = breakBatchOnConsumeFail; - } - - @Override - protected String createEndpointUri() { - // Make sure it's properly encoded - return "sql:" + UnsafeUriCharactersEncoder.encode(query); - } - - @SuppressWarnings("unchecked") - public List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException { - if (allowMapToClass && outputClass != null) { - Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass); - RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz); - RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper); - List<?> data = mapper.extractData(rs); - return data; - } else { - ColumnMapRowMapper rowMapper = new ColumnMapRowMapper(); - RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper); - List<Map<String, Object>> data = mapper.extractData(rs); - return data; - } - } - - @SuppressWarnings("unchecked") - public Object queryForObject(ResultSet rs) throws SQLException { - Object result = null; - if (outputClass == null) { - RowMapper rowMapper = new ColumnMapRowMapper(); - RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper); - List<Map<String, Object>> data = mapper.extractData(rs); - if (data.size() > 1) { - throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() + " count instead."); - } else if (data.size() == 1) { - // Set content depend on number of column from query result - Map<String, Object> row = data.get(0); - if (row.size() == 1) { - result = row.values().iterator().next(); - } else { - result = row; - } - } - } else { - Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass); - RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz); - RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper); - List<?> data = mapper.extractData(rs); - if (data.size() > 1) { - throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() + " count instead."); - } else if (data.size() == 1) { - result = data.get(0); - } - } - - // If data.size is zero, let result be null. - return result; - } - }