Repository: camel Updated Branches: refs/heads/master 4848bc6a9 -> efd7d3750
CAMEL-11694 - Camel-Hazelcast: Add more operation to queue - Take operation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04a8a844 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04a8a844 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04a8a844 Branch: refs/heads/master Commit: 04a8a8448c9052d49d8bb423b29ade8f9a8a1c31 Parents: 4848bc6 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Aug 23 10:26:44 2017 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Aug 23 10:26:44 2017 +0200 ---------------------------------------------------------------------- .../camel/component/hazelcast/HazelcastOperation.java | 2 ++ .../component/hazelcast/queue/HazelcastQueueProducer.java | 8 ++++++++ .../component/hazelcast/HazelcastQueueProducerTest.java | 9 +++++++++ 3 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/04a8a844/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java index 9a49ccf..bb33b9d 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java @@ -56,6 +56,8 @@ public enum HazelcastOperation { REMAINING_CAPACITY("remainingCapacity"), DRAIN_TO("drainTo"), REMOVE_IF("removeIf"), + TAKE("take"), + // topic PUBLISH("publish"), http://git-wip-us.apache.org/repos/asf/camel/blob/04a8a844/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java index 2cc71b9..b8350d3 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java @@ -96,6 +96,10 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { case DRAIN_TO: this.drainTo((Collection) drainToCollection, exchange); break; + + case TAKE: + this.take(exchange); + break; default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the QUEUE cache.", operation, HazelcastConstants.OPERATION)); @@ -156,4 +160,8 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { Predicate filter = exchange.getIn().getBody(Predicate.class); exchange.getOut().setBody(this.queue.removeIf(filter)); } + + private void take(Exchange exchange) throws InterruptedException { + exchange.getOut().setBody(this.queue.take()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/04a8a844/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java index a098bf8..b88aa18 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java @@ -147,6 +147,12 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { } @Test + public void take() throws InterruptedException { + template.sendBody("direct:take", "foo"); + verify(queue).take(); + } + + @Test public void drainTo() throws InterruptedException { Map<String, Object> headers = new HashMap<String, Object>(); Collection l = new ArrayList<>(); @@ -188,6 +194,9 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { from("direct:REMAINING_CAPACITY").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMAINING_CAPACITY)).to( String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX)); + from("direct:take").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.TAKE)).to( + String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX)); + from("direct:drainTo").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.DRAIN_TO)).to( String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));