Repository: camel
Updated Branches:
  refs/heads/master 3525784aa -> f96385a50


CAMEL-11685 - Camel-Hazelcast: Add removeAll and removeIf to queue component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72694068
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72694068
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72694068

Branch: refs/heads/master
Commit: 72694068635ea2aa80c13ace79aa9fdadc720036
Parents: 3525784
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Mon Aug 21 14:36:54 2017 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Mon Aug 21 14:36:54 2017 +0200

----------------------------------------------------------------------
 .../component/hazelcast/HazelcastOperation.java |  1 +
 .../hazelcast/queue/HazelcastQueueProducer.java | 20 ++++++++++++++++-
 .../hazelcast/HazelcastQueueProducerTest.java   | 23 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java
index 424a81c..9a49ccf 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastOperation.java
@@ -55,6 +55,7 @@ public enum HazelcastOperation {
     POLL("poll"),
     REMAINING_CAPACITY("remainingCapacity"),
     DRAIN_TO("drainTo"),
+    REMOVE_IF("removeIf"),
 
     // topic
     PUBLISH("publish"),

http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
index cc9a87a..2cc71b9 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.hazelcast.queue;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.logging.Logger;
+import java.util.function.Predicate;
 
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
@@ -85,6 +85,14 @@ public class HazelcastQueueProducer extends 
HazelcastDefaultProducer {
             this.remainingCapacity(exchange);
             break;
             
+        case REMOVE_ALL:
+            this.removeAll(exchange);
+            break;
+            
+        case REMOVE_IF:
+            this.removeIf(exchange);
+            break;
+            
         case DRAIN_TO:
             this.drainTo((Collection) drainToCollection, exchange);
             break;
@@ -138,4 +146,14 @@ public class HazelcastQueueProducer extends 
HazelcastDefaultProducer {
         exchange.getOut().setBody(this.queue.drainTo(c));
         exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
     }
+    
+    private void removeAll(Exchange exchange) {
+        Collection body = exchange.getIn().getBody(Collection.class);
+        this.queue.removeAll(body);
+    }
+    
+    private void removeIf(Exchange exchange) {
+        Predicate filter = exchange.getIn().getBody(Predicate.class);
+        exchange.getOut().setBody(this.queue.removeIf(filter));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/72694068/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
index b7c79a8..a098bf8 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueProducerTest.java
@@ -19,7 +19,9 @@ package org.apache.camel.component.hazelcast;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.function.Predicate;
 
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
@@ -130,6 +132,21 @@ public class HazelcastQueueProducerTest extends 
HazelcastCamelTestSupport {
     }
     
     @Test
+    public void removeAll() throws InterruptedException {
+        Collection c = new HashSet<>();
+        c.add("foo2");
+        template.sendBody("direct:removeAll", c);
+        verify(queue).removeAll(c);
+    }
+    
+    @Test
+    public void removeIf() throws InterruptedException {
+        Predicate<String> i  = (s)-> s.length() > 5;
+        template.sendBody("direct:removeIf", i);
+        verify(queue).removeIf(i);
+    }
+    
+    @Test
     public void drainTo() throws InterruptedException {
         Map<String, Object> headers = new HashMap<String, Object>();
         Collection l = new ArrayList<>();
@@ -161,6 +178,12 @@ public class HazelcastQueueProducerTest extends 
HazelcastCamelTestSupport {
 
                 
from("direct:removeValue").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastOperation.REMOVE_VALUE)).to(
                         String.format("hazelcast-%sbar", 
HazelcastConstants.QUEUE_PREFIX));
+                
+                
from("direct:removeAll").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastOperation.REMOVE_ALL)).to(
+                        String.format("hazelcast-%sbar", 
HazelcastConstants.QUEUE_PREFIX));
+                
+                
from("direct:removeIf").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastOperation.REMOVE_IF)).to(
+                        String.format("hazelcast-%sbar", 
HazelcastConstants.QUEUE_PREFIX));
 
                 
from("direct:REMAINING_CAPACITY").setHeader(HazelcastConstants.OPERATION, 
constant(HazelcastOperation.REMAINING_CAPACITY)).to(
                         String.format("hazelcast-%sbar", 
HazelcastConstants.QUEUE_PREFIX));

Reply via email to