Repository: camel Updated Branches: refs/heads/master 1aea80a26 -> b62f330c6
CAMEL-8992 Camel-Hazelcast: add drainTo operation to hazelcast queue producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f8b9b1d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f8b9b1d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f8b9b1d Branch: refs/heads/master Commit: 3f8b9b1d461c17f1b72803e7cfbfdb992b658adb Parents: 1aea80a Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue Jun 23 17:14:32 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue Jun 23 17:14:32 2015 +0200 ---------------------------------------------------------------------- .../component/hazelcast/HazelcastConstants.java | 4 ++- .../hazelcast/queue/HazelcastQueueProducer.java | 26 ++++++++++++++++++++ .../hazelcast/HazelcastQueueProducerTest.java | 20 +++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java index a146d46..6c42706 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java @@ -41,6 +41,7 @@ public final class HazelcastConstants { public static final String TTL_UNIT = "CamelHazelcastObjectTtlUnit"; public static final String QUERY = "CamelHazelcastQuery"; public static final String EXPECTED_VALUE = "CamelHazelcastExpectedValue"; + public static final String DRAIN_TO_COLLECTION = "CamelHazelcastDrainToCollection"; /* * outgoing header properties @@ -89,9 +90,10 @@ public final class HazelcastConstants { public static final int PEEK_OPERATION = 33; public static final int POLL_OPERATION = 34; public static final int REMAINING_CAPACITY_OPERATION = 35; + public static final int DRAIN_TO_OPERATION = 36; // topic - public static final int PUBLISH_OPERATION = 36; + public static final int PUBLISH_OPERATION = 37; /* * header values http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java index c814e3c..671eb60 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.hazelcast.queue; +import java.util.Collection; +import java.util.Map; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IQueue; @@ -24,6 +27,7 @@ import org.apache.camel.component.hazelcast.HazelcastComponentHelper; import org.apache.camel.component.hazelcast.HazelcastConstants; import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint; import org.apache.camel.component.hazelcast.HazelcastDefaultProducer; +import org.apache.camel.util.ObjectHelper; /** * @@ -38,6 +42,15 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { } public void process(Exchange exchange) throws Exception { + + Map<String, Object> headers = exchange.getIn().getHeaders(); + + // get header parameters + Object draintToCollection = null; + + if (headers.containsKey(HazelcastConstants.DRAIN_TO_COLLECTION)) { + draintToCollection = headers.get(HazelcastConstants.DRAIN_TO_COLLECTION); + } final int operation = lookupOperationNumber(exchange); @@ -73,6 +86,14 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { this.remainingCapacity(exchange); break; + case HazelcastConstants.DRAIN_TO_OPERATION: + if (ObjectHelper.isNotEmpty(draintToCollection)) { + this.drainTo((Collection) draintToCollection, exchange); + } else { + throw new IllegalArgumentException("Drain to collection header must be specified"); + } + break; + default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the QUEUE cache.", operation, HazelcastConstants.OPERATION)); } @@ -117,4 +138,9 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { private void remainingCapacity(Exchange exchange) { exchange.getOut().setBody(this.queue.remainingCapacity()); } + + private void drainTo(Collection c, Exchange exchange) { + exchange.getOut().setBody(this.queue.drainTo(c)); + exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/3f8b9b1d/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java index 3807e98..24a4a78 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java @@ -16,8 +16,14 @@ */ package org.apache.camel.component.hazelcast; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IQueue; + import org.apache.camel.CamelExecutionException; import org.apache.camel.builder.RouteBuilder; import org.junit.After; @@ -122,6 +128,17 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { verify(queue).remainingCapacity(); assertEquals(10, answer); } + + @Test + public void drainTo() throws InterruptedException { + Map<String, Object> headers = new HashMap<String, Object>(); + Collection l = new ArrayList<>(); + headers.put(HazelcastConstants.DRAIN_TO_COLLECTION, l); + when(queue.drainTo(l)).thenReturn(10); + int answer = template.requestBodyAndHeaders("direct:drainTo", "test", headers, Integer.class); + verify(queue).drainTo(l); + assertEquals(10, answer); + } @Override protected RouteBuilder createRouteBuilder() throws Exception { @@ -148,6 +165,9 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { from("direct:remainingCapacity").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.REMAINING_CAPACITY_OPERATION)).to( String.format("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX)); + from("direct:drainTo").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DRAIN_TO_OPERATION)).to( + String.format("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX)); + from("direct:putWithOperationNumber").toF(String.format("hazelcast:%sbar?operation=%s", HazelcastConstants.QUEUE_PREFIX, HazelcastConstants.PUT_OPERATION)); from("direct:putWithOperationName").toF(String.format("hazelcast:%sbar?operation=put", HazelcastConstants.QUEUE_PREFIX));