This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 4600fac CAMEL-16035 HazelcastQueueConsumer : do not send NULL when nothing is polled from queue (#4901) 4600fac is described below commit 4600fac2da1f91082d0501cf5af9ac6f2d3fcd68 Author: Zineb BENDHIBA <bendhiba.zi...@gmail.com> AuthorDate: Wed Jan 20 16:23:54 2021 +0100 CAMEL-16035 HazelcastQueueConsumer : do not send NULL when nothing is polled from queue (#4901) --- .../component/hazelcast/queue/HazelcastQueueConsumer.java | 15 +++++++++------ .../hazelcast/HazelcastCamelSpringTestSupport.java | 2 +- .../hazelcast/HazelcastQueueConsumerPollTest.java | 11 +++++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java index 4fa6948..7794bda 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java @@ -84,12 +84,15 @@ public class HazelcastQueueConsumer extends HazelcastDefaultConsumer { while (isRunAllowed()) { try { final Object body = queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS); - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(body); - try { - processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); + // CAMEL-16035 - If the polling timeout is exceeded with nothing to poll from the queue, the queue.poll() method return NULL + if (body != null) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(body); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } } } catch (InterruptedException e) { // ignore diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java index 00d4ef3..faa9a90 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java @@ -32,7 +32,7 @@ public abstract class HazelcastCamelSpringTestSupport extends CamelSpringTestSup @Override protected CamelContext createCamelContext() throws Exception { - MockitoAnnotations.initMocks(this); + MockitoAnnotations.openMocks(this); CamelContext context = super.createCamelContext(); HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance); trainHazelcastInstance(hazelcastInstance); diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java index 558d59b..d7f3040 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java @@ -67,6 +67,17 @@ public class HazelcastQueueConsumerPollTest extends HazelcastCamelTestSupport { this.checkHeadersAbsence(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); } + @Test + public void pollTimeout() throws InterruptedException { + // if nothing to poll after timeout the queue.poll returns NULL, the consumer shouldn't send this NULL message + when(queue.poll(10000, TimeUnit.MILLISECONDS)).thenReturn(null); + + MockEndpoint out = getMockEndpoint("mock:result"); + out.expectedMessageCount(0); + + assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {