SQL ComponentPage edited by Christian MuellerChanges (2)
Full ContentSQL ComponentThe sql: component allows you to work with databases using JDBC queries. The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query. This component uses spring-jdbc behind the scenes for the actual SQL handling. Maven users will need to add the following dependency to their pom.xml for this component: <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sql</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
URI format
The SQL component uses the following endpoint URI notation: sql:select * from table where id=# order by name[?options] From Camel 2.11 onwards you can use named parameters by using #:name style as shown: sql:select * from table where id=:#myId order by name[?options] When using named parameters, Camel will lookup the names from, in the given precedence: If a named parameter cannot be resolved, then an exception is thrown. Notice that the standard ? symbol that denotes the parameters to an SQL query is substituted with the # symbol, because the ? symbol is used to specify options for the endpoint. The ? symbol replacement can be configured on endpoint basis. You can append query options to the URI in the following format, ?option=value&option=value&... Options
Treatment of the message bodyThe SQL component tries to convert the message body to an object of java.util.Iterator type and then uses this iterator to fill the query parameters (where each query parameter is represented by a # symbol (or configured placeholder) in the endpoint URI). If the message body is not an array or collection, the conversion results in an iterator that iterates over only one object, which is the body itself. For example, if the message body is an instance of java.util.List, the first item in the list is substituted into the first occurrence of # in the SQL query, the second item in the list is substituted into the second occurrence of #, and so on. If batch is set to true, then the interpretation of the inbound message body changes slightly – instead of an iterator of parameters, the component expects an iterator that contains the parameter iterators; the size of the outer iterator determines the batch size. Result of the queryFor select operations, the result is an instance of List<Map<String, Object>> type, as returned by the JdbcTemplate.queryForList() method. For update operations, the result is the number of updated rows, returned as an Integer. Header valuesWhen performing update operations, the SQL Component stores the update count in the following message headers:
ConfigurationYou can now set a reference to a DataSource in the URI directly: sql:select * from table where id=# order by name?dataSourceRef=myDS SampleIn the sample below we execute a query and retrieve the result as a List of rows, where each row is a Map<String, Object and the key is the column name. First, we set up a table to use for our sample. As this is based on an unit test, we do it in java: // this is the database we create with some initial data for our unit test db = new EmbeddedDatabaseBuilder() .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); The SQL script createAndPopulateDatabase.sql we execute looks like as described below: create table projects (id integer primary key, project varchar(10), license varchar(5)); insert into projects values (1, 'Camel', 'ASF'); insert into projects values (2, 'AMQ', 'ASF'); insert into projects values (3, 'Linux', 'XXX'); Then we configure our route and our sql component. Notice that we use a direct endpoint in front of the sql endpoint. This allows us to send an exchange to the direct endpoint with the URI, direct:simple, which is much easier for the client to use than the long sql: URI. Note that the DataSource is looked up up in the registry, so we can use standard Spring XML to configure our DataSource. from("direct:simple") .to("sql:select * from projects where license = # order by id?dataSourceRef=jdbc/myDataSource") .to("mock:result"); And then we fire the message into the direct endpoint that will route it to our sql component that queries the database. MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); // send the query to direct that will route it to the sql where we will execute the query // and bind the parameters with the data from the body. The body only contains one value // in this case (XXX) but if we should use multi values then the body will be iterated // so we could supply a List<String> instead containing each binding value. template.sendBody("direct:simple", "XXX"); mock.assertIsSatisfied(); // the result is a List List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); // and each row in the list is a Map Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); // and we should be able the get the project from the map that should be Linux assertEquals("Linux", row.get("PROJECT")); We could configure the DataSource in Spring XML as follows:
<jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
Using named parametersAvailable as of Camel 2.11 In the given route below, we want to get all the projects from the projects table. Notice the SQL query has 2 named parameters, :#lic and :#min. from("direct:projects") .setHeader("lic", constant("ASF")) .setHeader("min", constant(123)) .to("sql:select * from projects where license = :#lic and id > :#min order by id") Though if the message body is a java.util.Map then the named parameters will be taken from the body. from("direct:projects") .to("sql:select * from projects where license = :#lic and id > :#min order by id") Using the JDBC based idempotent repositoryAvailable as of Camel 2.7: In this section we will use the JDBC based idempotent repository.
First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema: CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) ) In Camel 2.8, we added the createdAt column: CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP ) We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here. Second we need to setup a javax.sql.DataSource in the spring XML file: <jdbc:embedded-database id="dataSource" type="DERBY" />
And finally we can create our JDBC idempotent repository in the spring XML file as well: <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> </bean> <camel:camelContext> <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> </camel:errorHandler> <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> <camel:from uri="direct:start" /> <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> <camel:header>messageId</camel:header> <camel:to uri="mock:result" /> </camel:idempotentConsumer> </camel:route> </camel:camelContext> Customize the JdbcMessageIdRepositoryStarting with Camel 2.9.1 you have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository for your needs:
A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository could look like: <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> <property name="tableExistsString" value="SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0" /> <property name="createString" value="CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)" /> <property name="queryString" value="SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" /> <property name="insertString" value="INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)" /> <property name="deleteString" value="DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" /> </bean> Using the JDBC based aggregation repositoryAvailable as of Camel 2.6
JdbcAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository. It has the following options:
What is preserved when persistingJdbcAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted. From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns. RecoveryThe JdbcAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again. The following headers is set when an Exchange is being recovered/redelivered:
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success. You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit. You can see some examples in the unit tests of camel-sql, for example this test. DatabaseTo be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the RepositoryName property. In the following example aggregation will be used. The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array. Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name. CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) ); Storing body and headers as textAvailable as of Camel 2.11 You can configure the JdbcAggregationRepository to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName and accountName use the following SQL: CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) ); And then configure the repository to enable this behavior as shown below: <bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean> Codec (Serialization)Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream. The ClassLoadingAwareObjectInputStream has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references. TransactionA Spring PlatformTransactionManager is required to orchestrate transaction. Service (Start/Stop)The start method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting. Aggregator configurationDepending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property. Here is the declaration for Oracle: <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean> See Also
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component confluence
- [CONF] Apache Camel > SQL Component Claus Ibsen (Confluence)
- [CONF] Apache Camel > SQL Component Claus Ibsen (Confluence)
- [CONF] Apache Camel > SQL Component Zemian Deng (Confluence)
- [CONF] Apache Camel > SQL Component Zemian Deng (Confluence)
- [CONF] Apache Camel > SQL Component Christian Posta (Confluence)