Author: davsclaus
Date: Tue Feb 12 09:44:14 2013
New Revision: 1445076

URL: http://svn.apache.org/r1445076
Log:
CAMEL-6054: Added transacted option to hazelcast seda consumer. Thanks to 
Thomas Vautrin for the patch.

Added:
    
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java
      - copied, changed from r1445073, 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java
Modified:
    
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
    
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
    
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
    
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java

Modified: 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java?rev=1445076&r1=1445075&r2=1445076&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
 Tue Feb 12 09:44:14 2013
@@ -45,4 +45,8 @@ public abstract class HazelcastDefaultEn
     public boolean isSingleton() {
         return true;
     }
+
+    public HazelcastInstance getHazelcastInstance() {
+        return hazelcastInstance;
+    }
 }

Modified: 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1445076&r1=1445075&r2=1445076&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
 Tue Feb 12 09:44:14 2013
@@ -25,6 +25,7 @@ public class HazelcastSedaConfiguration 
     private int pollInterval = 1000;
     private String queueName;
     private boolean transferExchange;
+    private boolean transacted;
 
     public HazelcastSedaConfiguration() {
     }
@@ -61,4 +62,12 @@ public class HazelcastSedaConfiguration 
         this.transferExchange = transferExchange;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
 }

Modified: 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1445076&r1=1445075&r2=1445076&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 (original)
+++ 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 Tue Feb 12 09:44:14 2013
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.hazelcast.core.Transaction;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
@@ -74,6 +75,16 @@ public class HazelcastSedaConsumer exten
         while (queue != null && isRunAllowed()) {
             final Exchange exchange = this.getEndpoint().createExchange();
 
+            Transaction transaction = null;
+            if (endpoint.getConfiguration().isTransacted()) {
+                // Get and begin transaction if exist
+                transaction = endpoint.getHazelcastInstance().getTransaction();
+
+                if (transaction != null && transaction.getStatus() == 
Transaction.TXN_STATUS_NO_TXN) {
+                    log.trace("Begin transaction: {}", transaction);
+                    transaction.begin();
+                }
+            }
             try {
                 final Object body = 
queue.poll(endpoint.getConfiguration().getPollInterval(), 
TimeUnit.MILLISECONDS);
 
@@ -92,19 +103,38 @@ public class HazelcastSedaConsumer exten
                         });
 
                         if (exchange.getException() != null) {
+                            // Rollback
+                            if (transaction != null) {
+                                transaction.rollback();
+                            }
                             getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                         }
 
                     } catch (Exception e) {
                         LOG.error("Hzlq Exception caught: " + e, e);
+                        // Rollback
+                        if (transaction != null) {
+                            log.trace("Rollback transaction: {}", transaction);
+                            transaction.rollback();
+                        }
                     }
                 }
+                // It's OK, I commit
+                if (exchange.getException() == null && transaction != null && 
transaction.getStatus() == Transaction.TXN_STATUS_ACTIVE) {
+                    log.trace("Commit transaction: {}", transaction);
+                    transaction.commit();
+                }
             } catch (InterruptedException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Hzlq Consumer Interrupted: " + e, e);
                 }
                 continue;
             } catch (Throwable e) {
+                // Rollback
+                if (transaction != null) {
+                    log.trace("Rollback transaction: {}", transaction);
+                    transaction.rollback();
+                }
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
             }
         }

Modified: 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java?rev=1445076&r1=1445075&r2=1445076&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java
 Tue Feb 12 09:44:14 2013
@@ -36,7 +36,7 @@ public class HazelcastSedaEndpoint exten
     private final HazelcastSedaConfiguration configuration;
 
     public HazelcastSedaEndpoint(final HazelcastInstance hazelcastInstance, 
final String uri, final HazelcastComponent component, final 
HazelcastSedaConfiguration configuration) {
-        super(component.getHazelcastInstance(), uri, component);
+        super(hazelcastInstance, uri, component);
         this.queue = hazelcastInstance.getQueue(configuration.getQueueName());
         this.configuration = configuration;
         if (ObjectHelper.isEmpty(configuration.getQueueName())) {

Copied: 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java
 (from r1445073, 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java?p2=camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java&p1=camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java&r1=1445073&r2=1445076&rev=1445076&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java
 (original)
+++ 
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java
 Tue Feb 12 09:44:14 2013
@@ -16,33 +16,9 @@
  */
 package org.apache.camel.component.hazelcast;
 
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
 
-public class HazelcastSedaInOutTest extends CamelTestSupport {
-
-    @EndpointInject(uri = "mock:result")
-    private MockEndpoint mock;
-
-    @Test
-    public void sendInOut() throws Exception {
-        mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("test");
-
-        template.send("direct:foo", ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("test");
-            }
-        });
-        assertMockEndpointsSatisfied();
-        mock.reset();
-    }
+public class HazelcastSedaInOutTransactedTest extends HazelcastSedaInOutTest {
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -51,7 +27,7 @@ public class HazelcastSedaInOutTest exte
             public void configure() throws Exception {
                 from("direct:foo").to("hazelcast:seda:foo");
 
-                from("hazelcast:seda:foo").to("mock:result");
+                from("hazelcast:seda:foo?transacted=true").to("mock:result");
             }
         };
     }


Reply via email to