Author: davsclaus Date: Tue Feb 12 09:44:14 2013 New Revision: 1445076 URL: http://svn.apache.org/r1445076 Log: CAMEL-6054: Added transacted option to hazelcast seda consumer. Thanks to Thomas Vautrin for the patch.
Added: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java - copied, changed from r1445073, camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java?rev=1445076&r1=1445075&r2=1445076&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java (original) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java Tue Feb 12 09:44:14 2013 @@ -45,4 +45,8 @@ public abstract class HazelcastDefaultEn public boolean isSingleton() { return true; } + + public HazelcastInstance getHazelcastInstance() { + return hazelcastInstance; + } } Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1445076&r1=1445075&r2=1445076&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java (original) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java Tue Feb 12 09:44:14 2013 @@ -25,6 +25,7 @@ public class HazelcastSedaConfiguration private int pollInterval = 1000; private String queueName; private boolean transferExchange; + private boolean transacted; public HazelcastSedaConfiguration() { } @@ -61,4 +62,12 @@ public class HazelcastSedaConfiguration this.transferExchange = transferExchange; } + public boolean isTransacted() { + return transacted; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + } Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1445076&r1=1445075&r2=1445076&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Tue Feb 12 09:44:14 2013 @@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import com.hazelcast.core.Transaction; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; @@ -74,6 +75,16 @@ public class HazelcastSedaConsumer exten while (queue != null && isRunAllowed()) { final Exchange exchange = this.getEndpoint().createExchange(); + Transaction transaction = null; + if (endpoint.getConfiguration().isTransacted()) { + // Get and begin transaction if exist + transaction = endpoint.getHazelcastInstance().getTransaction(); + + if (transaction != null && transaction.getStatus() == Transaction.TXN_STATUS_NO_TXN) { + log.trace("Begin transaction: {}", transaction); + transaction.begin(); + } + } try { final Object body = queue.poll(endpoint.getConfiguration().getPollInterval(), TimeUnit.MILLISECONDS); @@ -92,19 +103,38 @@ public class HazelcastSedaConsumer exten }); if (exchange.getException() != null) { + // Rollback + if (transaction != null) { + transaction.rollback(); + } getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } catch (Exception e) { LOG.error("Hzlq Exception caught: " + e, e); + // Rollback + if (transaction != null) { + log.trace("Rollback transaction: {}", transaction); + transaction.rollback(); + } } } + // It's OK, I commit + if (exchange.getException() == null && transaction != null && transaction.getStatus() == Transaction.TXN_STATUS_ACTIVE) { + log.trace("Commit transaction: {}", transaction); + transaction.commit(); + } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Hzlq Consumer Interrupted: " + e, e); } continue; } catch (Throwable e) { + // Rollback + if (transaction != null) { + log.trace("Rollback transaction: {}", transaction); + transaction.rollback(); + } getExceptionHandler().handleException("Error processing exchange", exchange, e); } } Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java?rev=1445076&r1=1445075&r2=1445076&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java (original) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java Tue Feb 12 09:44:14 2013 @@ -36,7 +36,7 @@ public class HazelcastSedaEndpoint exten private final HazelcastSedaConfiguration configuration; public HazelcastSedaEndpoint(final HazelcastInstance hazelcastInstance, final String uri, final HazelcastComponent component, final HazelcastSedaConfiguration configuration) { - super(component.getHazelcastInstance(), uri, component); + super(hazelcastInstance, uri, component); this.queue = hazelcastInstance.getQueue(configuration.getQueueName()); this.configuration = configuration; if (ObjectHelper.isEmpty(configuration.getQueueName())) { Copied: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java (from r1445073, camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java?p2=camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java&p1=camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java&r1=1445073&r2=1445076&rev=1445076&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTest.java (original) +++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java Tue Feb 12 09:44:14 2013 @@ -16,33 +16,9 @@ */ package org.apache.camel.component.hazelcast; -import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; -public class HazelcastSedaInOutTest extends CamelTestSupport { - - @EndpointInject(uri = "mock:result") - private MockEndpoint mock; - - @Test - public void sendInOut() throws Exception { - mock.expectedMessageCount(1); - mock.expectedBodiesReceived("test"); - - template.send("direct:foo", ExchangePattern.InOut, new Processor() { - public void process(Exchange exchange) throws Exception { - exchange.getIn().setBody("test"); - } - }); - assertMockEndpointsSatisfied(); - mock.reset(); - } +public class HazelcastSedaInOutTransactedTest extends HazelcastSedaInOutTest { @Override protected RouteBuilder createRouteBuilder() throws Exception { @@ -51,7 +27,7 @@ public class HazelcastSedaInOutTest exte public void configure() throws Exception { from("direct:foo").to("hazelcast:seda:foo"); - from("hazelcast:seda:foo").to("mock:result"); + from("hazelcast:seda:foo?transacted=true").to("mock:result"); } }; }