Author: davsclaus Date: Tue Feb 12 12:53:26 2013 New Revision: 1445148 URL: http://svn.apache.org/r1445148 Log: CAMEL-6054: Added transacted option to hazelcast seda consumer. Thanks to Thomas Vautrin for the patch.
Added: camel/branches/camel-2.10.x/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java - copied unchanged from r1445076, camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaInOutTransactedTest.java Modified: camel/branches/camel-2.10.x/ (props changed) camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1445076 Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java?rev=1445148&r1=1445147&r2=1445148&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java (original) +++ camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java Tue Feb 12 12:53:26 2013 @@ -45,4 +45,8 @@ public abstract class HazelcastDefaultEn public boolean isSingleton() { return true; } + + public HazelcastInstance getHazelcastInstance() { + return hazelcastInstance; + } } Modified: camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1445148&r1=1445147&r2=1445148&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java (original) +++ camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java Tue Feb 12 12:53:26 2013 @@ -25,6 +25,7 @@ public class HazelcastSedaConfiguration private int pollInterval = 1000; private String queueName; private boolean transferExchange; + private boolean transacted; public HazelcastSedaConfiguration() { } @@ -61,4 +62,12 @@ public class HazelcastSedaConfiguration this.transferExchange = transferExchange; } + public boolean isTransacted() { + return transacted; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + } Modified: camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1445148&r1=1445147&r2=1445148&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original) +++ camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Tue Feb 12 12:53:26 2013 @@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import com.hazelcast.core.Transaction; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; @@ -74,6 +75,16 @@ public class HazelcastSedaConsumer exten while (queue != null && isRunAllowed()) { final Exchange exchange = this.getEndpoint().createExchange(); + Transaction transaction = null; + if (endpoint.getConfiguration().isTransacted()) { + // Get and begin transaction if exist + transaction = endpoint.getHazelcastInstance().getTransaction(); + + if (transaction != null && transaction.getStatus() == Transaction.TXN_STATUS_NO_TXN) { + log.trace("Begin transaction: {}", transaction); + transaction.begin(); + } + } try { final Object body = queue.poll(endpoint.getConfiguration().getPollInterval(), TimeUnit.MILLISECONDS); @@ -92,19 +103,38 @@ public class HazelcastSedaConsumer exten }); if (exchange.getException() != null) { + // Rollback + if (transaction != null) { + transaction.rollback(); + } getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } catch (Exception e) { LOG.error("Hzlq Exception caught: " + e, e); + // Rollback + if (transaction != null) { + log.trace("Rollback transaction: {}", transaction); + transaction.rollback(); + } } } + // It's OK, I commit + if (exchange.getException() == null && transaction != null && transaction.getStatus() == Transaction.TXN_STATUS_ACTIVE) { + log.trace("Commit transaction: {}", transaction); + transaction.commit(); + } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Hzlq Consumer Interrupted: " + e, e); } continue; } catch (Throwable e) { + // Rollback + if (transaction != null) { + log.trace("Rollback transaction: {}", transaction); + transaction.rollback(); + } getExceptionHandler().handleException("Error processing exchange", exchange, e); } } Modified: camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java?rev=1445148&r1=1445147&r2=1445148&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java (original) +++ camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaEndpoint.java Tue Feb 12 12:53:26 2013 @@ -36,7 +36,7 @@ public class HazelcastSedaEndpoint exten private final HazelcastSedaConfiguration configuration; public HazelcastSedaEndpoint(final HazelcastInstance hazelcastInstance, final String uri, final HazelcastComponent component, final HazelcastSedaConfiguration configuration) { - super(component.getHazelcastInstance(), uri, component); + super(hazelcastInstance, uri, component); this.queue = hazelcastInstance.getQueue(configuration.getQueueName()); this.configuration = configuration; if (ObjectHelper.isEmpty(configuration.getQueueName())) {