Author: davsclaus
Date: Tue Feb 12 12:53:26 2013
New Revision: 1445148

URL: http://svn.apache.org/r1445148
Log:
CAMEL-6054: Added transacted option to hazelcast seda consumer. Thanks to 
Thomas Vautrin for the patch.

Added:
    
camel/branches/camel-2.10.x/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java
      - copied unchanged from r1445076, 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
    
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
    
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
    
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1445076

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java?rev=1445148&r1=1445147&r2=1445148&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
 Tue Feb 12 12:53:26 2013
@@ -45,4 +45,8 @@ public abstract class HazelcastDefaultEn
     public boolean isSingleton() {
         return true;
     }
+
+    public HazelcastInstance getHazelcastInstance() {
+        return hazelcastInstance;
+    }
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1445148&r1=1445147&r2=1445148&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
 Tue Feb 12 12:53:26 2013
@@ -25,6 +25,7 @@ public class HazelcastSedaConfiguration 
     private int pollInterval = 1000;
     private String queueName;
     private boolean transferExchange;
+    private boolean transacted;
 
     public HazelcastSedaConfiguration() {
     }
@@ -61,4 +62,12 @@ public class HazelcastSedaConfiguration 
         this.transferExchange = transferExchange;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1445148&r1=1445147&r2=1445148&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 Tue Feb 12 12:53:26 2013
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.hazelcast.core.Transaction;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
@@ -74,6 +75,16 @@ public class HazelcastSedaConsumer exten
         while (queue != null && isRunAllowed()) {
             final Exchange exchange = this.getEndpoint().createExchange();
 
+            Transaction transaction = null;
+            if (endpoint.getConfiguration().isTransacted()) {
+                // Get and begin transaction if exist
+                transaction = endpoint.getHazelcastInstance().getTransaction();
+
+                if (transaction != null && transaction.getStatus() == 
Transaction.TXN_STATUS_NO_TXN) {
+                    log.trace("Begin transaction: {}", transaction);
+                    transaction.begin();
+                }
+            }
             try {
                 final Object body = 
queue.poll(endpoint.getConfiguration().getPollInterval(), 
TimeUnit.MILLISECONDS);
 
@@ -92,19 +103,38 @@ public class HazelcastSedaConsumer exten
                         });
 
                         if (exchange.getException() != null) {
+                            // Rollback
+                            if (transaction != null) {
+                                transaction.rollback();
+                            }
                             getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                         }
 
                     } catch (Exception e) {
                         LOG.error("Hzlq Exception caught: " + e, e);
+                        // Rollback
+                        if (transaction != null) {
+                            log.trace("Rollback transaction: {}", transaction);
+                            transaction.rollback();
+                        }
                     }
                 }
+                // It's OK, I commit
+                if (exchange.getException() == null && transaction != null && 
transaction.getStatus() == Transaction.TXN_STATUS_ACTIVE) {
+                    log.trace("Commit transaction: {}", transaction);
+                    transaction.commit();
+                }
             } catch (InterruptedException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Hzlq Consumer Interrupted: " + e, e);
                 }
                 continue;
             } catch (Throwable e) {
+                // Rollback
+                if (transaction != null) {
+                    log.trace("Rollback transaction: {}", transaction);
+                    transaction.rollback();
+                }
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
             }
         }

Modified: 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java?rev=1445148&r1=1445147&r2=1445148&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
 Tue Feb 12 12:53:26 2013
@@ -36,7 +36,7 @@ public class HazelcastSedaEndpoint exten
     private final HazelcastSedaConfiguration configuration;
 
     public HazelcastSedaEndpoint(final HazelcastInstance hazelcastInstance, 
final String uri, final HazelcastComponent component, final 
HazelcastSedaConfiguration configuration) {
-        super(component.getHazelcastInstance(), uri, component);
+        super(hazelcastInstance, uri, component);
         this.queue = hazelcastInstance.getQueue(configuration.getQueueName());
         this.configuration = configuration;
         if (ObjectHelper.isEmpty(configuration.getQueueName())) {


Reply via email to