Author: sully6768 Date: Tue Oct 30 19:46:40 2012 New Revision: 1403839 URL: http://svn.apache.org/viewvc?rev=1403839&view=rev Log: Added batch transaction & batch transaction timeout manager code
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1403839&r1=1403838&r2=1403839&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Tue Oct 30 19:46:40 2012 @@ -26,6 +26,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.component.sjms.jms.ConnectionFactoryResource; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.KeyFormatStrategy; +import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory; import org.apache.camel.impl.DefaultComponent; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -143,6 +144,7 @@ public class SjmsComponent extends Defau @Override protected void doStop() throws Exception { + TimedTaskManagerFactory.getInstance().cancelTasks(); if (getConnectionResource() != null) { if (getConnectionResource() instanceof ConnectionFactoryResource) { ((ConnectionFactoryResource)getConnectionResource()).drainPool(); Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1403839&r1=1403838&r2=1403839&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Tue Oct 30 19:46:40 2012 @@ -35,8 +35,10 @@ import org.apache.camel.component.sjms.j import org.apache.camel.component.sjms.jms.SessionPool; import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy; +import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.spi.Synchronization; /** * The SjmsConsumer is the base class for the SJMS MessageListener pool. @@ -243,17 +245,24 @@ public class SjmsConsumer extends Defaul } else { commitStrategy = new DefaultTransactionCommitStrategy(); } + + Synchronization synchronization = null; + if (commitStrategy instanceof BatchTransactionCommitStrategy) { + synchronization = new SessionBatchTransactionSynchronization(session, commitStrategy, getTransactionBatchTimeout()); + } else { + synchronization = new SessionTransactionSynchronization(session, commitStrategy); + } AbstractMessageHandler messageHandler = null; if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) { if (isTransacted()) { - messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy)); + messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization); } else { messageHandler = new InOnlyMessageHandler(getEndpoint(), executor); } } else { if (isTransacted()) { - messageHandler = new InOutMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy)); + messageHandler = new InOutMessageHandler(getEndpoint(), executor, synchronization); } else { messageHandler = new InOutMessageHandler(getEndpoint(), executor); } @@ -364,4 +373,13 @@ public class SjmsConsumer extends Defaul public int getTransactionBatchCount() { return getSjmsEndpoint().getTransactionBatchCount(); } + + /** + * Returns the timeout value for batch transactions. + * + * @return long + */ + public long getTransactionBatchTimeout() { + return getSjmsEndpoint().getTransactionBatchTimeout(); + } } Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1403839&r1=1403838&r2=1403839&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Tue Oct 30 19:46:40 2012 @@ -55,6 +55,7 @@ public class SjmsEndpoint extends Defaul private long responseTimeOut = 5000; private String messageSelector; private int transactionBatchCount = -1; + private long transactionBatchTimeout = 5000; private TransactionCommitStrategy transactionCommitStrategy; public SjmsEndpoint() { @@ -389,6 +390,24 @@ public class SjmsEndpoint extends Defaul } /** + * Returns the timeout value for batch transactions. + * + * @return long + */ + public long getTransactionBatchTimeout() { + return transactionBatchTimeout; + } + + /** + * Sets timeout value for batch transactions. + * + * @param transactionBatchTimeout + */ + public void setTransactionBatchTimeout(long transactionBatchTimeout) { + this.transactionBatchTimeout = transactionBatchTimeout; + } + + /** * Gets the commit strategy. * * @return the transactionCommitStrategy Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java?rev=1403839&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java (added) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java Tue Oct 30 19:46:40 2012 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.taskmanager; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * TODO Add Class documentation for TimedTaskManager + * + * @author sully6768 + */ +public class TimedTaskManager { + + private final Timer timer = new Timer(); + private ReadWriteLock lock = new ReentrantReadWriteLock(); + + TimedTaskManager() { + } + + public void addTask(TimerTask task, long delay) { + try { + lock.writeLock().lock(); + timer.schedule(task, delay); + } finally { + lock.writeLock().unlock(); + } + } + + public void cancelTasks() { + try { + lock.writeLock().lock(); + timer.cancel(); + } finally { + lock.writeLock().unlock(); + } + } +} Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java?rev=1403839&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java (added) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java Tue Oct 30 19:46:40 2012 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.taskmanager; + +/** + * TODO Add Class documentation for TimedTaskManagerFactory + * + * @author sully6768 + */ +public final class TimedTaskManagerFactory { + + private static class TimedTaskManagerHolder { + private final static TimedTaskManager INSTANCE = new TimedTaskManager(); + } + + public static TimedTaskManager getInstance() { + return TimedTaskManagerHolder.INSTANCE; + } +} Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java?rev=1403839&r1=1403838&r2=1403839&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java Tue Oct 30 19:46:40 2012 @@ -56,5 +56,9 @@ public class BatchTransactionCommitStrat current.set(0); return true; } + + public void reset() { + current.set(0); + } } Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java?rev=1403839&view=auto ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java (added) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Tue Oct 30 19:46:40 2012 @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import java.util.TimerTask; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.jms.Session; + +import org.apache.camel.Exchange; +import org.apache.camel.component.sjms.TransactionCommitStrategy; +import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory; +import org.apache.camel.spi.Synchronization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SessionTransactionSynchronization is called at the completion of each + * {@link org.apache.camel.Exhcnage}. + */ +public class SessionBatchTransactionSynchronization implements Synchronization { + private Logger log = LoggerFactory.getLogger(getClass()); + private Session session; + private final TransactionCommitStrategy commitStrategy; + private long batchTransactionTimeout = 5000; + private TimeoutTask currentTask; + private ReadWriteLock lock = new ReentrantReadWriteLock(); + + public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) { + this(session, commitStrategy, 5000); + } + + public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) { + this.session = session; + if (commitStrategy == null) { + this.commitStrategy = new DefaultTransactionCommitStrategy(); + } else { + this.commitStrategy = commitStrategy; + } + if (batchTransactionTimeout > 0) { + this.batchTransactionTimeout = batchTransactionTimeout; + createTask(); + } + } + + /** + * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange) + * @param exchange + */ + @Override + public void onFailure(Exchange exchange) { + try { + lock.readLock().lock(); + if (commitStrategy.rollback(exchange)) { + log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId()); + if (session != null && session.getTransacted()) { + session.rollback(); + } + } + } catch (Exception e) { + log.warn("Failed to rollback the session: {}", e.getMessage()); + } finally { + lock.readLock().unlock(); + } + } + + /** + * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange) + * @param exchange + */ + @Override + public void onComplete(Exchange exchange) { + try { + lock.readLock().lock(); + if (commitStrategy.commit(exchange)) { + log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId()); + if (session != null && session.getTransacted()) { + session.commit(); + } + } + } catch (Exception e) { + log.warn("Failed to commit the session: {}", e.getMessage()); + exchange.setException(e); + } finally { + lock.readLock().unlock(); + } + resetTask(); + } + + private void createTask() { + try { + lock.writeLock().lock(); + currentTask = new TimeoutTask(); + } finally { + lock.writeLock().unlock(); + } + } + + private void resetTask() { + try { + lock.writeLock().lock(); + currentTask.cancel(); + currentTask = new TimeoutTask(); + } finally { + lock.writeLock().unlock(); + } + TimedTaskManagerFactory.getInstance().addTask(currentTask, batchTransactionTimeout); + } + + public class TimeoutTask extends TimerTask { + + /** + * Default constructor + * + * @param str + */ + TimeoutTask() { + } + + /** + * When the timer executes, either commits or rolls back the session + * transaction. + */ + public void run() { + log.info("Batch Transaction Timer expired:"); + try { + lock.writeLock().lock(); + log.debug("Committing the current transactions"); + try { + if (session != null && session.getTransacted()) { + session.commit(); + } + ((BatchTransactionCommitStrategy)commitStrategy).reset(); + } catch (Exception e) { + log.warn("Failed to commit the session during timeout: {}", e.getMessage()); + } + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public boolean cancel() { + if (log.isTraceEnabled()) { + log.trace("Cancelling the TimeoutTask"); + } + return super.cancel(); + } + } +} Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java?rev=1403839&r1=1403838&r2=1403839&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java (original) +++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java Tue Oct 30 19:46:40 2012 @@ -148,6 +148,33 @@ public class SjmsEndpointTest extends Ca assertTrue(qe.getDestinationName().equals("test")); } + @Test + public void testTransactedBatchCount() throws Exception { + Endpoint endpoint = context.getEndpoint("sjms:queue:test?transacted=true&transactionBatchCount=10"); + assertNotNull(endpoint); + assertTrue(endpoint instanceof SjmsEndpoint); + SjmsEndpoint qe = (SjmsEndpoint)endpoint; + assertTrue(qe.getDestinationName().equals("test")); + } + + @Test + public void testTransactedBatchTimeoutDefault() throws Exception { + Endpoint endpoint = context.getEndpoint("sjms:queue:test?transacted=true"); + assertNotNull(endpoint); + assertTrue(endpoint instanceof SjmsEndpoint); + SjmsEndpoint qe = (SjmsEndpoint)endpoint; + assertTrue(qe.getDestinationName().equals("test")); + } + + @Test + public void testTransactedBatchTimeoutModified() throws Exception { + Endpoint endpoint = context.getEndpoint("sjms:queue:test?transacted=true&transactionBatchTimeout=3000"); + assertNotNull(endpoint); + assertTrue(endpoint instanceof SjmsEndpoint); + SjmsEndpoint qe = (SjmsEndpoint)endpoint; + assertTrue(qe.getDestinationName().equals("test")); + } + protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext();