Author: davsclaus Date: Fri Dec 23 06:11:18 2011 New Revision: 1222582 URL: http://svn.apache.org/viewvc?rev=1222582&view=rev Log: CAMEL-4822: Introduced AbstractJdbcMessageIdRepository to make it easier to implement custom JDBC repo, to control the SQL being used. Thanks to Philip Glebow for the patch.
Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java?rev=1222582&view=auto ============================================================================== --- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java (added) +++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011 @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import javax.sql.DataSource; + +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jmx.export.annotation.ManagedOperation; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * Base class for JDBC-based idempotent repositories that allows the schema to be changed. + * <p/> + * Subclasses need only implement theses methods: + * <ul> + * <li>{@link #queryForInt(T key)}</li> + * <li>{@link #insert(T key)}</li> + * <li>{@link #delete(T key)}</li> + * </ul> + * <p/> + * These methods should perform the named database operation. + */ +@ManagedResource("JDBC IdempotentRepository") +public abstract class AbstractJdbcMessageIdRepository<T> extends ServiceSupport implements IdempotentRepository<T> { + + protected JdbcTemplate jdbcTemplate; + protected String processorName; + protected TransactionTemplate transactionTemplate; + protected DataSource dataSource; + + public AbstractJdbcMessageIdRepository() { + super(); + } + + public AbstractJdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.transactionTemplate = transactionTemplate; + } + + public AbstractJdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) { + this.jdbcTemplate = new JdbcTemplate(dataSource); + this.jdbcTemplate.afterPropertiesSet(); + this.processorName = processorName; + this.transactionTemplate = transactionTemplate; + } + + public AbstractJdbcMessageIdRepository(DataSource dataSource, String processorName) { + this(dataSource, createTransactionTemplate(dataSource), processorName); + } + + /** + * Operation that returns the number of rows, if any, for the specified key + * + * @param key the key + * @return int number of rows + */ + protected abstract int queryForInt(final T key); + + /** + * Operation that inserts the key if it does not already exist + * + * @param key the key + * @return int number of rows inserted + */ + protected abstract int insert(final T key); + + /** + * Operations that deletes the key if it exists + * + * @param key the key + * @return int number of rows deleted + */ + protected abstract int delete(final T key); + + /** + * Creates the transaction template + */ + protected static TransactionTemplate createTransactionTemplate(DataSource dataSource) { + TransactionTemplate transactionTemplate = new TransactionTemplate(); + transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource)); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + return transactionTemplate; + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + + @ManagedOperation(description = "Adds the key to the store") + @Override + public boolean add(final T key) { + // Run this in single transaction. + Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { + public Boolean doInTransaction(TransactionStatus status) { + int count = queryForInt(key); + if (count == 0) { + insert(key); + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + } + }); + return rc.booleanValue(); + } + + @ManagedOperation(description = "Does the store contain the given key") + @Override + public boolean contains(final T key) { + // Run this in single transaction. + Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { + public Boolean doInTransaction(TransactionStatus status) { + int count = queryForInt(key); + if (count == 0) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } + } + }); + return rc.booleanValue(); + + } + + @ManagedOperation(description = "Remove the key from the store") + @Override + public boolean remove(final T key) { + Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { + public Boolean doInTransaction(TransactionStatus status) { + int updateCount = delete(key); + if (updateCount == 0) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } + } + }); + return rc.booleanValue(); + } + + @Override + public boolean confirm(final T key) { + return true; + } + + public JdbcTemplate getJdbcTemplate() { + return jdbcTemplate; + } + + public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + public String getProcessorName() { + return processorName; + } + + public void setProcessorName(String processorName) { + this.processorName = processorName; + } + + public TransactionTemplate getTransactionTemplate() { + return transactionTemplate; + } + + public void setTransactionTemplate(TransactionTemplate transactionTemplate) { + this.transactionTemplate = transactionTemplate; + } + + public DataSource getDataSource() { + return dataSource; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + +} Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1222582&r1=1222581&r2=1222582&view=diff ============================================================================== --- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java (original) +++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011 @@ -17,120 +17,49 @@ package org.apache.camel.processor.idempotent.jdbc; import java.sql.Timestamp; - import javax.sql.DataSource; -import org.apache.camel.api.management.ManagedAttribute; -import org.apache.camel.api.management.ManagedOperation; -import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.spi.IdempotentRepository; -import org.apache.camel.support.ServiceSupport; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; /** - * @version + * Default implementation of {@link AbstractJdbcMessageIdRepository} */ -@ManagedResource(description = "JDBC based message id repository") -public class JdbcMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> { - - protected static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; - protected static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; - protected static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; - - private final JdbcTemplate jdbcTemplate; - private final String processorName; - private final TransactionTemplate transactionTemplate; +public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository<String> { + + public static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + public static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; + public static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + + public JdbcMessageIdRepository() { + super(); + } public JdbcMessageIdRepository(DataSource dataSource, String processorName) { - this(dataSource, createTransactionTemplate(dataSource), processorName); + super(dataSource, processorName); } public JdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) { - this.jdbcTemplate = new JdbcTemplate(dataSource); - this.jdbcTemplate.afterPropertiesSet(); - this.processorName = processorName; - this.transactionTemplate = transactionTemplate; - } - - public static JdbcMessageIdRepository jpaMessageIdRepository(DataSource dataSource, String processorName) { - return new JdbcMessageIdRepository(dataSource, processorName); - } - - private static TransactionTemplate createTransactionTemplate(DataSource dataSource) { - TransactionTemplate transactionTemplate = new TransactionTemplate(); - transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource)); - transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); - return transactionTemplate; - } - - @ManagedOperation(description = "Adds the key to the store") - public boolean add(final String messageId) { - // Run this in single transaction. - Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { - public Boolean doInTransaction(TransactionStatus status) { - int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId); - if (count == 0) { - jdbcTemplate.update(INSERT_STRING, processorName, messageId, new Timestamp(System.currentTimeMillis())); - return Boolean.TRUE; - } else { - return Boolean.FALSE; - } - } - }); - return rc.booleanValue(); - } - - @ManagedOperation(description = "Does the store contain the given key") - public boolean contains(final String messageId) { - // Run this in single transaction. - Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { - public Boolean doInTransaction(TransactionStatus status) { - int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId); - if (count == 0) { - return Boolean.FALSE; - } else { - return Boolean.TRUE; - } - } - }); - return rc.booleanValue(); - } - - @ManagedOperation(description = "Remove the key from the store") - public boolean remove(final String messageId) { - Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() { - public Boolean doInTransaction(TransactionStatus status) { - int updateCount = jdbcTemplate.update(DELETE_STRING, processorName, messageId); - if (updateCount == 0) { - return Boolean.FALSE; - } else { - return Boolean.TRUE; - } - } - }); - return rc.booleanValue(); - } - - public boolean confirm(String s) { - // noop - return true; - } - - @ManagedAttribute(description = "The processor name") - public String getProcessorName() { - return processorName; + super(dataSource, transactionTemplate, processorName); + } + + public JdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) { + super(jdbcTemplate, transactionTemplate); } @Override - protected void doStart() throws Exception { + protected int queryForInt(String key) { + return jdbcTemplate.queryForInt(QUERY_STRING, processorName, key); } @Override - protected void doStop() throws Exception { + protected int insert(String key) { + return jdbcTemplate.update(INSERT_STRING, processorName, key, new Timestamp(System.currentTimeMillis())); } -} + + @Override + protected int delete(String key) { + return jdbcTemplate.update(DELETE_STRING, processorName, key); + } + +} \ No newline at end of file