Repository: camel Updated Branches: refs/heads/master 3525784aa -> f96385a50
CAMEL-11685 - Camel-Hazelcast: Add removeAll and removeIf to queue component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72694068 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72694068 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72694068 Branch: refs/heads/master Commit: 72694068635ea2aa80c13ace79aa9fdadc720036 Parents: 3525784 Author: Andrea Cosentino <anco...@gmail.com> Authored: Mon Aug 21 14:36:54 2017 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Aug 21 14:36:54 2017 +0200 ---------------------------------------------------------------------- .../component/hazelcast/HazelcastOperation.java | 1 + .../hazelcast/queue/HazelcastQueueProducer.java | 20 ++++++++++++++++- .../hazelcast/HazelcastQueueProducerTest.java | 23 ++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java index 424a81c..9a49ccf 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java @@ -55,6 +55,7 @@ public enum HazelcastOperation { POLL("poll"), REMAINING_CAPACITY("remainingCapacity"), DRAIN_TO("drainTo"), + REMOVE_IF("removeIf"), // topic PUBLISH("publish"), http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java index cc9a87a..2cc71b9 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java @@ -18,7 +18,7 @@ package org.apache.camel.component.hazelcast.queue; import java.util.Collection; import java.util.Map; -import java.util.logging.Logger; +import java.util.function.Predicate; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IQueue; @@ -85,6 +85,14 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { this.remainingCapacity(exchange); break; + case REMOVE_ALL: + this.removeAll(exchange); + break; + + case REMOVE_IF: + this.removeIf(exchange); + break; + case DRAIN_TO: this.drainTo((Collection) drainToCollection, exchange); break; @@ -138,4 +146,14 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer { exchange.getOut().setBody(this.queue.drainTo(c)); exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c); } + + private void removeAll(Exchange exchange) { + Collection body = exchange.getIn().getBody(Collection.class); + this.queue.removeAll(body); + } + + private void removeIf(Exchange exchange) { + Predicate filter = exchange.getIn().getBody(Predicate.class); + exchange.getOut().setBody(this.queue.removeIf(filter)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java index b7c79a8..a098bf8 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java @@ -19,7 +19,9 @@ package org.apache.camel.component.hazelcast; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.function.Predicate; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IQueue; @@ -130,6 +132,21 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { } @Test + public void removeAll() throws InterruptedException { + Collection c = new HashSet<>(); + c.add("foo2"); + template.sendBody("direct:removeAll", c); + verify(queue).removeAll(c); + } + + @Test + public void removeIf() throws InterruptedException { + Predicate<String> i = (s)-> s.length() > 5; + template.sendBody("direct:removeIf", i); + verify(queue).removeIf(i); + } + + @Test public void drainTo() throws InterruptedException { Map<String, Object> headers = new HashMap<String, Object>(); Collection l = new ArrayList<>(); @@ -161,6 +178,12 @@ public class HazelcastQueueProducerTest extends HazelcastCamelTestSupport { from("direct:removeValue").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_VALUE)).to( String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX)); + + from("direct:removeAll").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_ALL)).to( + String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX)); + + from("direct:removeIf").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_IF)).to( + String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX)); from("direct:REMAINING_CAPACITY").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMAINING_CAPACITY)).to( String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));