Author: sully6768
Date: Tue Oct 30 19:46:40 2012
New Revision: 1403839

URL: http://svn.apache.org/viewvc?rev=1403839&view=rev
Log:
Added batch transaction & batch transaction timeout manager code

Added:
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
Modified:
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
    
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
    
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1403839&r1=1403838&r2=1403839&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
 Tue Oct 30 19:46:40 2012
@@ -26,6 +26,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -143,6 +144,7 @@ public class SjmsComponent extends Defau
 
     @Override
     protected void doStop() throws Exception {
+        TimedTaskManagerFactory.getInstance().cancelTasks();
         if (getConnectionResource() != null) {
             if (getConnectionResource() instanceof ConnectionFactoryResource) {
                 
((ConnectionFactoryResource)getConnectionResource()).drainPool();

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1403839&r1=1403838&r2=1403839&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
 Tue Oct 30 19:46:40 2012
@@ -35,8 +35,10 @@ import org.apache.camel.component.sjms.j
 import org.apache.camel.component.sjms.jms.SessionPool;
 import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
+import 
org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.Synchronization;
 
 /**
  * The SjmsConsumer is the base class for the SJMS MessageListener pool.
@@ -243,17 +245,24 @@ public class SjmsConsumer extends Defaul
         } else {
             commitStrategy = new DefaultTransactionCommitStrategy();
         }
+        
+        Synchronization synchronization = null;
+        if (commitStrategy instanceof BatchTransactionCommitStrategy) {
+            synchronization = new 
SessionBatchTransactionSynchronization(session, commitStrategy, 
getTransactionBatchTimeout());
+        } else {
+            synchronization = new SessionTransactionSynchronization(session, 
commitStrategy);
+        }
 
         AbstractMessageHandler messageHandler = null;
         if 
(getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
             if (isTransacted()) {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(), 
executor, new SessionTransactionSynchronization(session, commitStrategy));
+                messageHandler = new InOnlyMessageHandler(getEndpoint(), 
executor, synchronization);
             } else {
                 messageHandler = new InOnlyMessageHandler(getEndpoint(), 
executor);
             }
         } else {
             if (isTransacted()) {
-                messageHandler = new InOutMessageHandler(getEndpoint(), 
executor, new SessionTransactionSynchronization(session, commitStrategy));
+                messageHandler = new InOutMessageHandler(getEndpoint(), 
executor, synchronization);
             } else {
                 messageHandler = new InOutMessageHandler(getEndpoint(), 
executor);
             }
@@ -364,4 +373,13 @@ public class SjmsConsumer extends Defaul
     public int getTransactionBatchCount() {
         return getSjmsEndpoint().getTransactionBatchCount();
     }
+
+    /**
+     * Returns the timeout value for batch transactions.
+     * 
+     * @return long
+     */
+    public long getTransactionBatchTimeout() {
+        return getSjmsEndpoint().getTransactionBatchTimeout();
+    }
 }

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1403839&r1=1403838&r2=1403839&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 Tue Oct 30 19:46:40 2012
@@ -55,6 +55,7 @@ public class SjmsEndpoint extends Defaul
     private long responseTimeOut = 5000;
     private String messageSelector;
     private int transactionBatchCount = -1;
+    private long transactionBatchTimeout = 5000;
     private TransactionCommitStrategy transactionCommitStrategy;
 
     public SjmsEndpoint() {
@@ -389,6 +390,24 @@ public class SjmsEndpoint extends Defaul
     }
 
     /**
+     * Returns the timeout value for batch transactions.
+     * 
+     * @return long
+     */
+    public long getTransactionBatchTimeout() {
+        return transactionBatchTimeout;
+    }
+
+    /**
+     * Sets timeout value for batch transactions.
+     * 
+     * @param transactionBatchTimeout
+     */
+    public void setTransactionBatchTimeout(long transactionBatchTimeout) {
+        this.transactionBatchTimeout = transactionBatchTimeout;
+    }
+
+    /**
      * Gets the commit strategy.
      * 
      * @return the transactionCommitStrategy

Added: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java?rev=1403839&view=auto
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
 (added)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
 Tue Oct 30 19:46:40 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.taskmanager;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * TODO Add Class documentation for TimedTaskManager
+ *
+ * @author sully6768
+ */
+public class TimedTaskManager {
+
+    private final Timer timer = new Timer();
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+    
+    TimedTaskManager() {
+    }
+    
+    public void addTask(TimerTask task, long delay) {
+        try {
+            lock.writeLock().lock();
+            timer.schedule(task, delay);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+    
+    public void cancelTasks() {
+        try {
+            lock.writeLock().lock();
+            timer.cancel();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+}

Added: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java?rev=1403839&view=auto
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
 (added)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
 Tue Oct 30 19:46:40 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.taskmanager;
+
+/**
+ * TODO Add Class documentation for TimedTaskManagerFactory
+ * 
+ * @author sully6768
+ */
+public final class TimedTaskManagerFactory {
+
+    private static class TimedTaskManagerHolder {
+        private final static TimedTaskManager INSTANCE = new 
TimedTaskManager();
+    }
+
+    public static TimedTaskManager getInstance() {
+        return TimedTaskManagerHolder.INSTANCE;
+    }
+}

Modified: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java?rev=1403839&r1=1403838&r2=1403839&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
 Tue Oct 30 19:46:40 2012
@@ -56,5 +56,9 @@ public class BatchTransactionCommitStrat
         current.set(0);
         return true;
     }
+    
+    public void reset() {
+        current.set(0);
+    }
 
 }

Added: 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java?rev=1403839&view=auto
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
 (added)
+++ 
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
 Tue Oct 30 19:46:40 2012
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
+import org.apache.camel.spi.Synchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionTransactionSynchronization is called at the completion of each
+ * {@link org.apache.camel.Exhcnage}.
+ */
+public class SessionBatchTransactionSynchronization implements Synchronization 
{
+    private Logger log = LoggerFactory.getLogger(getClass());
+    private Session session;
+    private final TransactionCommitStrategy commitStrategy;
+    private long batchTransactionTimeout = 5000;
+    private TimeoutTask currentTask;
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public SessionBatchTransactionSynchronization(Session session, 
TransactionCommitStrategy commitStrategy) {
+        this(session, commitStrategy, 5000);
+    }
+
+    public SessionBatchTransactionSynchronization(Session session, 
TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) {
+        this.session = session;
+        if (commitStrategy == null) {
+            this.commitStrategy = new DefaultTransactionCommitStrategy();
+        } else {
+            this.commitStrategy = commitStrategy;
+        }
+        if (batchTransactionTimeout > 0) {
+            this.batchTransactionTimeout = batchTransactionTimeout;
+            createTask();
+        }
+    }
+
+    /**
+     * @see 
org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
+     * @param exchange
+     */
+    @Override
+    public void onFailure(Exchange exchange) {
+        try {
+            lock.readLock().lock();
+            if (commitStrategy.rollback(exchange)) {
+                log.debug("Processing failure of Exchange id:{}", 
exchange.getExchangeId());
+                if (session != null && session.getTransacted()) {
+                    session.rollback();
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to rollback the session: {}", e.getMessage());
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @see 
org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
+     * @param exchange
+     */
+    @Override
+    public void onComplete(Exchange exchange) {
+        try {
+            lock.readLock().lock();
+            if (commitStrategy.commit(exchange)) {
+                log.debug("Processing completion of Exchange id:{}", 
exchange.getExchangeId());
+                if (session != null && session.getTransacted()) {
+                    session.commit();
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to commit the session: {}", e.getMessage());
+            exchange.setException(e);
+        } finally {
+            lock.readLock().unlock();
+        }
+        resetTask();
+    }
+
+    private void createTask() {
+        try {
+            lock.writeLock().lock();
+            currentTask = new TimeoutTask();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void resetTask() {
+        try {
+            lock.writeLock().lock();
+            currentTask.cancel();
+            currentTask = new TimeoutTask();
+        } finally {
+            lock.writeLock().unlock();
+        }
+        TimedTaskManagerFactory.getInstance().addTask(currentTask, 
batchTransactionTimeout);
+    }
+
+    public class TimeoutTask extends TimerTask {
+
+        /**
+         * Default constructor
+         * 
+         * @param str
+         */
+        TimeoutTask() {
+        }
+
+        /**
+         * When the timer executes, either commits or rolls back the session
+         * transaction.
+         */
+        public void run() {
+            log.info("Batch Transaction Timer expired:");
+            try {
+                lock.writeLock().lock();
+                log.debug("Committing the current transactions");
+                try {
+                    if (session != null && session.getTransacted()) {
+                        session.commit();
+                    }
+                    ((BatchTransactionCommitStrategy)commitStrategy).reset();
+                } catch (Exception e) {
+                    log.warn("Failed to commit the session during timeout: 
{}", e.getMessage());
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        @Override
+        public boolean cancel() {
+            if (log.isTraceEnabled()) {
+                log.trace("Cancelling the TimeoutTask");
+            }
+            return super.cancel();
+        }
+    }
+}

Modified: 
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java?rev=1403839&r1=1403838&r2=1403839&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
 (original)
+++ 
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
 Tue Oct 30 19:46:40 2012
@@ -148,6 +148,33 @@ public class SjmsEndpointTest extends Ca
         assertTrue(qe.getDestinationName().equals("test"));
     }
 
+    @Test
+    public void testTransactedBatchCount() throws Exception {
+        Endpoint endpoint = 
context.getEndpoint("sjms:queue:test?transacted=true&transactionBatchCount=10");
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
+        assertTrue(qe.getDestinationName().equals("test"));
+    }
+
+    @Test
+    public void testTransactedBatchTimeoutDefault() throws Exception {
+        Endpoint endpoint = 
context.getEndpoint("sjms:queue:test?transacted=true");
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
+        assertTrue(qe.getDestinationName().equals("test"));
+    }
+
+    @Test
+    public void testTransactedBatchTimeoutModified() throws Exception {
+        Endpoint endpoint = 
context.getEndpoint("sjms:queue:test?transacted=true&transactionBatchTimeout=3000");
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
+        assertTrue(qe.getDestinationName().equals("test"));
+    }
+
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 


Reply via email to