This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push: new 5ef8f79 CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868) (#4912) 5ef8f79 is described below commit 5ef8f79475db97e517f2d9bc41622a8e80a53164 Author: Zineb BENDHIBA <bendhiba.zi...@gmail.com> AuthorDate: Sat Jan 23 10:20:23 2021 +0100 CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868) (#4912) --- .../hazelcast/HazelcastComponentHelper.java | 4 +- .../HazelcastAtomicnumberProducer.java | 10 ++-- .../hazelcast/list/HazelcastListProducer.java | 2 +- .../hazelcast/map/HazelcastMapProducer.java | 16 +++--- .../multimap/HazelcastMultimapProducer.java | 8 +-- .../hazelcast/queue/HazelcastQueueProducer.java | 14 ++--- .../HazelcastReplicatedmapConsumer.java | 2 +- .../ringbuffer/HazelcastRingbufferProducer.java | 14 ++--- .../hazelcast/HazelcastCamelTestSupport.java | 2 +- .../HazelcastReplicatedmapConsumerTest.java | 64 +++++++++------------- 10 files changed, 63 insertions(+), 73 deletions(-) diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java index e3139a9..4683c61 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java @@ -41,8 +41,8 @@ public final class HazelcastComponentHelper { } // propagate headers if OUT message created - if (ex.hasOut()) { - ex.getOut().setHeaders(headers); + if (ex.getMessage() != null) { + ex.getMessage().setHeaders(headers); } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java index bea9f87..5761f39 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java @@ -92,15 +92,15 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer { } private void get(Exchange exchange) { - exchange.getOut().setBody(this.atomicnumber.get()); + exchange.getMessage().setBody(this.atomicnumber.get()); } private void increment(Exchange exchange) { - exchange.getOut().setBody(this.atomicnumber.incrementAndGet()); + exchange.getMessage().setBody(this.atomicnumber.incrementAndGet()); } private void decrement(Exchange exchange) { - exchange.getOut().setBody(this.atomicnumber.decrementAndGet()); + exchange.getMessage().setBody(this.atomicnumber.decrementAndGet()); } private void compare(long expected, Exchange exchange) { @@ -108,12 +108,12 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer { if (ObjectHelper.isEmpty(expected)) { throw new IllegalArgumentException("Expected value must be specified"); } - exchange.getOut().setBody(this.atomicnumber.compareAndSet(expected, update)); + exchange.getMessage().setBody(this.atomicnumber.compareAndSet(expected, update)); } private void getAndAdd(Exchange exchange) { long delta = exchange.getIn().getBody(Long.class); - exchange.getOut().setBody(this.atomicnumber.getAndAdd(delta)); + exchange.getMessage().setBody(this.atomicnumber.getAndAdd(delta)); } private void set(Exchange exchange) { diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java index db57ab6..8044083 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java @@ -114,7 +114,7 @@ public class HazelcastListProducer extends HazelcastDefaultProducer { } private void get(Integer pos, Exchange exchange) { - exchange.getOut().setBody(this.list.get(pos)); + exchange.getMessage().setBody(this.list.get(pos)); } private void set(Integer pos, Exchange exchange) { diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java index 6ad84f9..a54246a 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java @@ -128,7 +128,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { break; case CLEAR: - this.clear(exchange); + this.clear(); break; case EVICT: @@ -159,7 +159,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { } else { result = this.cache.values(); } - exchange.getOut().setBody(result); + exchange.getMessage().setBody(result); } /** @@ -193,14 +193,14 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { * find an object by the given id and give it back */ private void get(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.get(oid)); + exchange.getMessage().setBody(this.cache.get(oid)); } /** * GET All objects and give it back */ private void getAll(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.getAll((Set<Object>) oid)); + exchange.getMessage().setBody(this.cache.getAll((Set<Object>) oid)); } /** @@ -239,7 +239,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { /** * Clear all the entries */ - private void clear(Exchange exchange) { + private void clear() { this.cache.clear(); } @@ -261,7 +261,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { * Check for a specific key in the cache and return true if it exists or false otherwise */ private void containsKey(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.containsKey(oid)); + exchange.getMessage().setBody(this.cache.containsKey(oid)); } /** @@ -269,13 +269,13 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer { */ private void containsValue(Exchange exchange) { Object body = exchange.getIn().getBody(); - exchange.getOut().setBody(this.cache.containsValue(body)); + exchange.getMessage().setBody(this.cache.containsValue(body)); } /** * GET keys set of objects and give it back */ private void getKeys(Exchange exchange) { - exchange.getOut().setBody(this.cache.keySet()); + exchange.getMessage().setBody(this.cache.keySet()); } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java index f736591..386dc51 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java @@ -99,7 +99,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer { } private void get(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.get(oid)); + exchange.getMessage().setBody(this.cache.get(oid)); } private void delete(Object oid) { @@ -111,7 +111,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer { } private void valuecount(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.valueCount(oid)); + exchange.getMessage().setBody(this.cache.valueCount(oid)); } private void clear(Exchange exchange) { @@ -119,11 +119,11 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer { } private void containsKey(Object oid, Exchange exchange) { - exchange.getOut().setBody(this.cache.containsKey(oid)); + exchange.getMessage().setBody(this.cache.containsKey(oid)); } private void containsValue(Exchange exchange) { Object body = exchange.getIn().getBody(); - exchange.getOut().setBody(this.cache.containsValue(body)); + exchange.getMessage().setBody(this.cache.containsValue(body)); } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java index 2e86417..3113acf 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java @@ -127,11 +127,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { } private void poll(Exchange exchange) { - exchange.getOut().setBody(this.queue.poll()); + exchange.getMessage().setBody(this.queue.poll()); } private void peek(Exchange exchange) { - exchange.getOut().setBody(this.queue.peek()); + exchange.getMessage().setBody(this.queue.peek()); } private void offer(Exchange exchange) { @@ -149,12 +149,12 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { } private void remainingCapacity(Exchange exchange) { - exchange.getOut().setBody(this.queue.remainingCapacity()); + exchange.getMessage().setBody(this.queue.remainingCapacity()); } private void drainTo(Collection c, Exchange exchange) { - exchange.getOut().setBody(this.queue.drainTo(c)); - exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c); + exchange.getMessage().setBody(this.queue.drainTo(c)); + exchange.getMessage().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c); } private void removeAll(Exchange exchange) { @@ -164,11 +164,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { private void removeIf(Exchange exchange) { Predicate filter = exchange.getIn().getBody(Predicate.class); - exchange.getOut().setBody(this.queue.removeIf(filter)); + exchange.getMessage().setBody(this.queue.removeIf(filter)); } private void take(Exchange exchange) throws InterruptedException { - exchange.getOut().setBody(this.queue.take()); + exchange.getMessage().setBody(this.queue.take()); } private void retainAll(Exchange exchange) { diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java index 796d2a5..2a215fd 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java @@ -45,7 +45,7 @@ public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer { protected void doStart() throws Exception { super.doStart(); - listener = cache.addEntryListener(new CamelEntryListener(this, cacheName), true); + listener = cache.addEntryListener(new CamelEntryListener(this, cacheName)); } /** diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java index e649f31..528d873 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java @@ -73,23 +73,23 @@ public class HazelcastRingbufferProducer extends HazelcastDefaultProducer { } private void readOnceHead(Exchange exchange) throws InterruptedException { - exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.headSequence())); + exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.headSequence())); } private void readOnceTail(Exchange exchange) throws InterruptedException { - exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence())); + exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence())); } - private void getCapacity(Exchange exchange) throws InterruptedException { - exchange.getOut().setBody(this.ringbuffer.capacity()); + private void getCapacity(Exchange exchange) { + exchange.getMessage().setBody(this.ringbuffer.capacity()); } - private void getRemainingCapacity(Exchange exchange) throws InterruptedException { - exchange.getOut().setBody(this.ringbuffer.remainingCapacity()); + private void getRemainingCapacity(Exchange exchange) { + exchange.getMessage().setBody(this.ringbuffer.remainingCapacity()); } private void add(Exchange exchange) { final Object body = exchange.getIn().getBody(); - exchange.getOut().setBody(ringbuffer.add(body)); + exchange.getMessage().setBody(ringbuffer.add(body)); } } diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java index aed1630..c3b1df7 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java @@ -32,7 +32,7 @@ public class HazelcastCamelTestSupport extends CamelTestSupport { @Override protected CamelContext createCamelContext() throws Exception { - MockitoAnnotations.initMocks(this); + MockitoAnnotations.openMocks(this); CamelContext context = super.createCamelContext(); HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance); trainHazelcastInstance(hazelcastInstance); diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java index 74c854d..6a699ab 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java @@ -17,47 +17,45 @@ package org.apache.camel.component.hazelcast; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.hazelcast.core.EntryEvent; -import com.hazelcast.core.EntryEventType; -import com.hazelcast.core.EntryListener; +import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.replicatedmap.ReplicatedMap; +import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSupport { +public class HazelcastReplicatedmapConsumerTest extends CamelTestSupport { - @Mock + private HazelcastInstance hazelcastInstance; private ReplicatedMap<Object, Object> map; - @Captor - private ArgumentCaptor<EntryListener<Object, Object>> argument; + @BeforeEach + public void beforeEach() { + hazelcastInstance = Hazelcast.newHazelcastInstance(); + map = hazelcastInstance.getReplicatedMap("rm"); + } - @Override - protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) { - when(hazelcastInstance.getReplicatedMap("rm")).thenReturn(map); - when(map.addEntryListener(any(), eq(true))).thenReturn(UUID.randomUUID()); + @AfterEach + public void afterEach() { + if (hazelcastInstance != null) { + hazelcastInstance.shutdown(); + } } @Override - @SuppressWarnings("unchecked") - protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) { - verify(hazelcastInstance).getReplicatedMap("rm"); - verify(map).addEntryListener(any(EntryListener.class), eq(true)); + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance); + return context; } @Test @@ -65,10 +63,7 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor MockEndpoint out = getMockEndpoint("mock:added"); out.expectedMessageCount(1); - verify(map).addEntryListener(argument.capture(), eq(true)); - EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo"); - argument.getValue().entryAdded(event); - + map.put("4711", "my-foo"); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); @@ -81,11 +76,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor public void testEvict() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:evicted"); out.expectedMessageCount(1); - - verify(map).addEntryListener(argument.capture(), eq(true)); - EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo"); - argument.getValue().entryEvicted(event); - + map.put("4711", "my-foo", 100, TimeUnit.MILLISECONDS); + Thread.sleep(150); assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS); } @@ -93,11 +85,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor public void testRemove() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:removed"); out.expectedMessageCount(1); - - verify(map).addEntryListener(argument.capture(), eq(true)); - EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo"); - argument.getValue().entryRemoved(event); - + map.put("4711", "my-foo"); + map.remove("4711"); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED); } @@ -124,4 +113,5 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor assertEquals("4711", headers.get(HazelcastConstants.OBJECT_ID)); assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME)); } + }