Author: cmueller Date: Wed Feb 9 22:21:18 2011 New Revision: 1069147 URL: http://svn.apache.org/viewvc?rev=1069147&view=rev Log: CAMEL-3648: Provide a JdbcMessageIdRepository for the idempotent consumer EIP
Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java camel/trunk/components/camel-sql/src/test/resources/org/ camel/trunk/components/camel-sql/src/test/resources/org/apache/ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml Modified: camel/trunk/components/camel-sql/pom.xml Modified: camel/trunk/components/camel-sql/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/pom.xml?rev=1069147&r1=1069146&r2=1069147&view=diff ============================================================================== --- camel/trunk/components/camel-sql/pom.xml (original) +++ camel/trunk/components/camel-sql/pom.xml Wed Feb 9 22:21:18 2011 @@ -28,33 +28,40 @@ <packaging>bundle</packaging> <name>Camel :: SQL</name> <description>Camel SQL support</description> - - <properties> - <camel.osgi.export.pkg>org.apache.camel.component.sql.*</camel.osgi.export.pkg> - </properties> + + <properties> + <camel.osgi.export.pkg> + org.apache.camel.component.sql.*;${camel.osgi.version}, + org.apache.camel.processor.idempotent.jdbc.*;${camel.osgi.version} + </camel.osgi.export.pkg> + <camel.osgi.import.pkg> + !org.apache.camel.component.sql.*, + !org.apache.camel.processor.idempotent.jdbc.*, + * + </camel.osgi.import.pkg> + </properties> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> </dependency> + <!-- to allow Spring annotations (jmx) to be tested --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> </dependency> + <!-- test dependencies --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <scope>test</scope> </dependency> - <!-- to allow Spring annotations (jmx) to be tested --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - <optional>true</optional> - <scope>test</scope> - </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1069147&view=auto ============================================================================== --- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java (added) +++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Wed Feb 9 22:21:18 2011 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import javax.sql.DataSource; + +import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.spi.IdempotentRepository; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedOperation; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * @version $Revision$ + */ +@ManagedResource("JdbcMessageIdRepository") +public class JdbcMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> { + + protected static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + protected static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId) VALUES (?, ?)"; + protected static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + + private final JdbcTemplate jdbcTemplate; + private final String processorName; + private final TransactionTemplate transactionTemplate; + + public JdbcMessageIdRepository(DataSource dataSource, String processorName) { + this(dataSource, createTransactionTemplate(dataSource), processorName); + } + + public JdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) { + this.jdbcTemplate = new JdbcTemplate(dataSource); + this.jdbcTemplate.afterPropertiesSet(); + this.processorName = processorName; + this.transactionTemplate = transactionTemplate; + } + + public static JdbcMessageIdRepository jpaMessageIdRepository(DataSource dataSource, String processorName) { + return new JdbcMessageIdRepository(dataSource, processorName); + } + + private static TransactionTemplate createTransactionTemplate(DataSource dataSource) { + TransactionTemplate transactionTemplate = new TransactionTemplate(); + transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource)); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + return transactionTemplate; + } + + @ManagedOperation(description = "Adds the key to the store") + @SuppressWarnings({ "unchecked", "rawtypes" }) + public boolean add(final String messageId) { + // Run this in single transaction. + Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId); + if (count == 0) { + jdbcTemplate.update(INSERT_STRING, processorName, messageId); + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + } + }); + return rc.booleanValue(); + } + + @ManagedOperation(description = "Does the store contain the given key") + @SuppressWarnings({ "unchecked", "rawtypes" }) + public boolean contains(final String messageId) { + // Run this in single transaction. + Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId); + if (count == 0) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } + } + }); + return rc.booleanValue(); + } + + @ManagedOperation(description = "Remove the key from the store") + @SuppressWarnings({ "unchecked", "rawtypes" }) + public boolean remove(final String messageId) { + Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + int updateCount = jdbcTemplate.update(DELETE_STRING, processorName, messageId); + if (updateCount == 0) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } + } + }); + return rc.booleanValue(); + } + + public boolean confirm(String s) { + // noop + return true; + } + + @ManagedAttribute(description = "The processor name") + public String getProcessorName() { + return processorName; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } +} Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java?rev=1069147&view=auto ============================================================================== --- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java (added) +++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java Wed Feb 9 22:21:18 2011 @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import java.util.List; + +import javax.sql.DataSource; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.test.junit4.CamelSpringTestSupport; +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; + + +public class JdbcMessageIdRepositoryTest extends CamelSpringTestSupport { + + protected static final String SELECT_ALL_STRING = "SELECT messageId FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; + protected static final String DELETE_ALL_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; + protected static final String PROCESSOR_NAME = "myProcessorName"; + + protected JdbcTemplate jdbcTemplate; + protected DataSource dataSource; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject(uri = "mock:error") + protected MockEndpoint errorEndpoint; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + dataSource = context.getRegistry().lookup("dataSource", DataSource.class); + jdbcTemplate = new JdbcTemplate(dataSource); + jdbcTemplate.afterPropertiesSet(); + + setupRepository(); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/spring.xml"); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected void setupRepository() { + TransactionTemplate transactionTemplate = new TransactionTemplate(); + transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource)); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + + transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + try { + jdbcTemplate.execute("CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(20), messageId VARCHAR(10))"); + } catch (DataAccessException e) { + // noop if table already exists + } + jdbcTemplate.update(DELETE_ALL_STRING, PROCESSOR_NAME); + return Boolean.TRUE; + } + }); + } + + @Test + public void testDuplicateMessagesAreFilteredOut() throws Exception { + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + errorEndpoint.expectedMessageCount(0); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + + // all 3 messages should be in jdbc repo + List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME); + + assertEquals(3, receivedMessageIds.size()); + assertTrue(receivedMessageIds.contains("1")); + assertTrue(receivedMessageIds.contains("2")); + assertTrue(receivedMessageIds.contains("3")); + } + + @Test + public void testFailedExchangesNotAdded() throws Exception { + RouteBuilder interceptor = new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("mock:result") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + String id = exchange.getIn().getHeader("messageId", String.class); + if (id.equals("2")) { + throw new IllegalArgumentException("Damn I cannot handle id 2"); + } + } + }); + } + }; + RouteDefinition routeDefinition = context.getRouteDefinition("JdbcMessageIdRepositoryTest"); + routeDefinition.adviceWith(context, interceptor); + + // we send in 2 messages with id 2 that fails + errorEndpoint.expectedMessageCount(2); + resultEndpoint.expectedBodiesReceived("one", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + + // only message 1 and 3 should be in jdbc repo + List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME); + + assertEquals(2, receivedMessageIds.size()); + assertTrue("Should contain message 1", receivedMessageIds.contains("1")); + assertTrue("Should contain message 3", receivedMessageIds.contains("3")); + } + + protected void sendMessage(final Object messageId, final Object body) { + template.send("direct:start", new Processor() { + public void process(Exchange exchange) { + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } +} \ No newline at end of file Added: camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml?rev=1069147&view=auto ============================================================================== --- camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml (added) +++ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml Wed Feb 9 22:21:18 2011 @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:camel="http://camel.apache.org/schema/spring" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource"> + <property name="driverClassName" value="org.hsqldb.jdbcDriver"/> + <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/> + <property name="username" value="sa"/> + <property name="password" value=""/> + </bean> + + <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> + <constructor-arg ref="dataSource" /> + <constructor-arg value="myProcessorName" /> + </bean> + + <camel:camelContext> + <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> + <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> + </camel:errorHandler> + + <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> + <camel:from uri="direct:start" /> + <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> + <camel:header>messageId</camel:header> + <camel:to uri="mock:result" /> + </camel:idempotentConsumer> + </camel:route> + </camel:camelContext> +</beans> \ No newline at end of file