Repository: camel
Updated Branches:
  refs/heads/master fd8e2cb78 -> e54a51dfa


CAMEL-11777: Use transaction aware queue in hazelcast:seda component

As mentioned in docs of hz (any version, current for example)
http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html#creating-a-transaction-interface
Transaction do its work if queue initialized with help of transaction ctx


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e54a51df
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e54a51df
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e54a51df

Branch: refs/heads/master
Commit: e54a51dfad37ff351b047064b19a1f73a345b3f4
Parents: fd8e2cb
Author: Kirill Merkushev <lan...@yandex.ru>
Authored: Mon Sep 18 12:13:49 2017 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Sep 19 09:30:57 2017 +0200

----------------------------------------------------------------------
 .../component/hazelcast/seda/HazelcastSedaConsumer.java      | 5 +++--
 .../HazelcastSedaRecoverableConsumerNewTransactionTest.java  | 3 ++-
 .../HazelcastSedaRecoverableConsumerRollbackTest.java        | 3 ++-
 .../hazelcast/HazelcastSedaRecoverableConsumerTest.java      | 8 ++++++++
 4 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index 7e3b24c..cebd69c 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.component.hazelcast.seda;
 
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.hazelcast.core.BaseQueue;
 import com.hazelcast.transaction.TransactionContext;
 
 import org.apache.camel.AsyncCallback;
@@ -71,7 +71,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer 
implements Runnable {
     }
 
     public void run() {
-        final BlockingQueue<?> queue = endpoint.getQueue();
+        BaseQueue<?> queue = 
endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName());
 
         while (queue != null && isRunAllowed()) {
             final Exchange exchange = this.getEndpoint().createExchange();
@@ -85,6 +85,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer 
implements Runnable {
                     if (transactionCtx != null) {
                         log.trace("Begin transaction: {}", 
transactionCtx.getTxnId());
                         transactionCtx.beginTransaction();
+                        queue = 
transactionCtx.getQueue(endpoint.getConfiguration().getQueueName());
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
index d9f5eec..d5db83b 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
@@ -31,10 +31,11 @@ public class 
HazelcastSedaRecoverableConsumerNewTransactionTest extends Hazelcas
                 .thenThrow(new HazelcastException("Could not obtain 
Connection!!!"))
                 .thenReturn(transactionContext);
         when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+        when(transactionContext.getQueue("foo")).thenReturn(tqueue);
     }
 
     protected void verifyHazelcastInstance(HazelcastInstance 
hazelcastInstance) {
-        verify(hazelcastInstance).getQueue("foo");
+        verify(hazelcastInstance, times(2)).getQueue("foo");
         verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
index 43d0bcc..c029556 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
@@ -37,10 +37,11 @@ public class HazelcastSedaRecoverableConsumerRollbackTest 
extends HazelcastSedaR
                 .when(transactionContext).rollbackTransaction();
         
when(hazelcastInstance.newTransactionContext()).thenReturn(transactionContext);
         when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+        when(transactionContext.getQueue("foo")).thenReturn(tqueue);
     }
 
     protected void verifyHazelcastInstance(HazelcastInstance 
hazelcastInstance) {
-        verify(hazelcastInstance).getQueue("foo");
+        verify(hazelcastInstance, times(2)).getQueue("foo");
         verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e54a51df/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
index ee29f7c..00c8fe3 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.hazelcast;
 import java.util.concurrent.TimeUnit;
 
 import com.hazelcast.core.IQueue;
+import com.hazelcast.core.TransactionalQueue;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -34,6 +35,9 @@ public abstract class HazelcastSedaRecoverableConsumerTest 
extends HazelcastCame
     @Mock
     protected IQueue<Object> queue;
 
+    @Mock
+    protected TransactionalQueue<Object> tqueue;
+
     @EndpointInject(uri = "mock:result")
     protected MockEndpoint mock;
 
@@ -43,6 +47,10 @@ public abstract class HazelcastSedaRecoverableConsumerTest 
extends HazelcastCame
                 .thenReturn("bar")
                 .thenReturn(null);
 
+        when(tqueue.poll(any(Long.class), any(TimeUnit.class)))
+                .thenReturn("bar")
+                .thenReturn(null);
+
         mock.expectedMessageCount(1);
 
         assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);

Reply via email to