Repository: camel Updated Branches: refs/heads/master fd8e2cb78 -> e54a51dfa
CAMEL-11777: Use transaction aware queue in hazelcast:seda component As mentioned in docs of hz (any version, current for example) http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html#creating-a-transaction-interface Transaction do its work if queue initialized with help of transaction ctx Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e54a51df Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e54a51df Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e54a51df Branch: refs/heads/master Commit: e54a51dfad37ff351b047064b19a1f73a345b3f4 Parents: fd8e2cb Author: Kirill Merkushev <lan...@yandex.ru> Authored: Mon Sep 18 12:13:49 2017 +0300 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Sep 19 09:30:57 2017 +0200 ---------------------------------------------------------------------- .../component/hazelcast/seda/HazelcastSedaConsumer.java | 5 +++-- .../HazelcastSedaRecoverableConsumerNewTransactionTest.java | 3 ++- .../HazelcastSedaRecoverableConsumerRollbackTest.java | 3 ++- .../hazelcast/HazelcastSedaRecoverableConsumerTest.java | 8 ++++++++ 4 files changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java index 7e3b24c..cebd69c 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java @@ -16,10 +16,10 @@ */ package org.apache.camel.component.hazelcast.seda; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import com.hazelcast.core.BaseQueue; import com.hazelcast.transaction.TransactionContext; import org.apache.camel.AsyncCallback; @@ -71,7 +71,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable { } public void run() { - final BlockingQueue<?> queue = endpoint.getQueue(); + BaseQueue<?> queue = endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName()); while (queue != null && isRunAllowed()) { final Exchange exchange = this.getEndpoint().createExchange(); @@ -85,6 +85,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable { if (transactionCtx != null) { log.trace("Begin transaction: {}", transactionCtx.getTxnId()); transactionCtx.beginTransaction(); + queue = transactionCtx.getQueue(endpoint.getConfiguration().getQueueName()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java index d9f5eec..d5db83b 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java @@ -31,10 +31,11 @@ public class HazelcastSedaRecoverableConsumerNewTransactionTest extends Hazelcas .thenThrow(new HazelcastException("Could not obtain Connection!!!")) .thenReturn(transactionContext); when(hazelcastInstance.getQueue("foo")).thenReturn(queue); + when(transactionContext.getQueue("foo")).thenReturn(tqueue); } protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) { - verify(hazelcastInstance).getQueue("foo"); + verify(hazelcastInstance, times(2)).getQueue("foo"); verify(hazelcastInstance, atLeastOnce()).newTransactionContext(); } http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java index 43d0bcc..c029556 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java @@ -37,10 +37,11 @@ public class HazelcastSedaRecoverableConsumerRollbackTest extends HazelcastSedaR .when(transactionContext).rollbackTransaction(); when(hazelcastInstance.newTransactionContext()).thenReturn(transactionContext); when(hazelcastInstance.getQueue("foo")).thenReturn(queue); + when(transactionContext.getQueue("foo")).thenReturn(tqueue); } protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) { - verify(hazelcastInstance).getQueue("foo"); + verify(hazelcastInstance, times(2)).getQueue("foo"); verify(hazelcastInstance, atLeastOnce()).newTransactionContext(); } http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java index ee29f7c..00c8fe3 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.hazelcast; import java.util.concurrent.TimeUnit; import com.hazelcast.core.IQueue; +import com.hazelcast.core.TransactionalQueue; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -34,6 +35,9 @@ public abstract class HazelcastSedaRecoverableConsumerTest extends HazelcastCame @Mock protected IQueue<Object> queue; + @Mock + protected TransactionalQueue<Object> tqueue; + @EndpointInject(uri = "mock:result") protected MockEndpoint mock; @@ -43,6 +47,10 @@ public abstract class HazelcastSedaRecoverableConsumerTest extends HazelcastCame .thenReturn("bar") .thenReturn(null); + when(tqueue.poll(any(Long.class), any(TimeUnit.class))) + .thenReturn("bar") + .thenReturn(null); + mock.expectedMessageCount(1); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);