Repository: camel Updated Branches: refs/heads/master fe823e345 -> 1451499b8
CAMEL-8709 Camel-Infinispan: Add Async operations to available set Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1451499b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1451499b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1451499b Branch: refs/heads/master Commit: 1451499b83a69842658f0bfd95a8cd1fa2632e80 Parents: fe823e3 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sun Apr 26 22:07:12 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Apr 27 21:55:20 2015 +0200 ---------------------------------------------------------------------- .../infinispan/InfinispanConstants.java | 5 + .../infinispan/InfinispanOperation.java | 87 +++++ .../infinispan/InfinispanProducerTest.java | 371 +++++++++++++++++++ 3 files changed, 463 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java index 2c0ba42..481446c 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -25,13 +25,18 @@ public interface InfinispanConstants { String MAP = "CamelInfinispanMap"; String OPERATION = "CamelInfinispanOperation"; String PUT = "CamelInfinispanOperationPut"; + String PUT_ASYNC = "CamelInfinispanOperationPutAsync"; String PUT_IF_ABSENT = "CamelInfinispanOperationPutIfAbsent"; + String PUT_IF_ABSENT_ASYNC = "CamelInfinispanOperationPutIfAbsentAsync"; String GET = "CamelInfinispanOperationGet"; String CONTAINS_KEY = "CamelInfinispanOperationContainsKey"; String CONTAINS_VALUE = "CamelInfinispanOperationContainsValue"; String PUT_ALL = "CamelInfinispanOperationPutAll"; + String PUT_ALL_ASYNC = "CamelInfinispanOperationPutAllAsync"; String REMOVE = "CamelInfinispanOperationRemove"; + String REMOVE_ASYNC = "CamelInfinispanOperationRemoveAsync"; String REPLACE = "CamelInfinispanOperationReplace"; + String REPLACE_ASYNC = "CamelInfinispanOperationReplaceAsync"; String CLEAR = "CamelInfinispanOperationClear"; String SIZE = "CamelInfinispanOperationSize"; String RESULT = "CamelInfinispanOperationResult"; http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java index 3450666..38b3e74 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.util.ObjectHelper; import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.util.concurrent.NotifyingFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,26 @@ public class InfinispanOperation { } setResult(result, exchange); } + }, PUTASYNC { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + NotifyingFuture result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); + String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); + String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); + result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.putAsync(getKey(exchange), getValue(exchange)); + } + setResult(result, exchange); + } }, PUTALL { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { @@ -92,6 +113,26 @@ public class InfinispanOperation { cache.putAll(getMap(exchange)); } } + }, PUTALLASYNC { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + NotifyingFuture result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); + String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); + String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); + result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.putAllAsync(getMap(exchange)); + } + setResult(result, exchange); + } }, PUTIFABSENT { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { @@ -112,6 +153,26 @@ public class InfinispanOperation { } setResult(result, exchange); } + }, PUTIFABSENTASYNC { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + NotifyingFuture result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); + String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); + String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); + result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange)); + } + setResult(result, exchange); + } }, GET { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { @@ -136,6 +197,12 @@ public class InfinispanOperation { Object result = cache.remove(getKey(exchange)); setResult(result, exchange); } + }, REMOVEASYNC { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + NotifyingFuture result = cache.removeAsync(getKey(exchange)); + setResult(result, exchange); + } }, REPLACE { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { @@ -156,6 +223,26 @@ public class InfinispanOperation { } setResult(result, exchange); } + }, REPLACEASYNC { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + NotifyingFuture result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); + String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); + String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); + result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.replaceAsync(getKey(exchange), getValue(exchange)); + } + setResult(result, exchange); + } }, SIZE { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java index 84df8e4..c806847 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.infinispan.commons.util.concurrent.NotifyingFuture; import org.junit.Test; public class InfinispanProducerTest extends InfinispanTestSupport { @@ -79,6 +80,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } @Test + public void publishKeyAndValueAsync() throws Exception { + Exchange exchange = template.send("direct:putasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + } + }); + + Thread.sleep(10000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + } + + @Test + public void publishKeyAndValueAsyncWithLifespan() throws Exception { + Exchange exchange = template.send("direct:putasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(1000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + + Thread.sleep(6000); + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void publishKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception { + Exchange exchange = template.send("direct:putasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(1000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + + Thread.sleep(10000); + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test public void publishMapNormal() throws Exception { template.send("direct:start", new Processor() { @Override @@ -181,6 +262,110 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } @Test + public void publishMapNormalAsync() throws Exception { + template.send("direct:putallasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + } + }); + + Thread.sleep(100); + assertEquals(currentCache().size(), 2); + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + value = currentCache().get(KEY_TWO); + assertEquals(value.toString(), VALUE_TWO); + } + + @Test + public void publishMapWithLifespanAsync() throws Exception { + template.send("direct:putallasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(100); + assertEquals(currentCache().size(), 2); + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + value = currentCache().get(KEY_TWO); + assertEquals(value.toString(), VALUE_TWO); + + Thread.sleep(LIFESPAN_TIME * 1000); + + Exchange exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO); + } + }); + resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void publishMapWithLifespanAndMaxIdleTimeAsync() throws Exception { + template.send("direct:putallasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(100); + assertEquals(currentCache().size(), 2); + + Thread.sleep(10000); + + Exchange exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO); + } + }); + resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test public void putIfAbsentAlreadyExists() throws Exception { currentCache().put(KEY_ONE, VALUE_ONE); @@ -217,6 +402,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } @Test + public void putIfAbsentKeyAndValueAsync() throws Exception { + Exchange exchange = template.send("direct:putifabsentasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + } + }); + + Thread.sleep(10000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + } + + @Test + public void putIfAbsentKeyAndValueAsyncWithLifespan() throws Exception { + Exchange exchange = template.send("direct:putifabsentasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(1000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + + Thread.sleep(6000); + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void putIfAbsentKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception { + Exchange exchange = template.send("direct:putifabsentasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + Thread.sleep(1000); + NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(Boolean.TRUE, resultPutAsync.isDone()); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + + Thread.sleep(10000); + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test public void notContainsKeyTest() throws Exception { currentCache().put(KEY_ONE, VALUE_ONE); @@ -423,6 +688,82 @@ public class InfinispanProducerTest extends InfinispanTestSupport { String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); assertEquals(null, resultGet); } + + @Test + public void replaceAValueByKeyAsync() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + } + + @Test + public void replaceAValueByKeyWithLifespanAsync() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(LIFESPAN_TIME * 1000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void replaceAValueByKeyWithLifespanAndMaxIdleTimeAsync() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(10000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } @Test public void deletesExistingValueByKey() throws Exception { @@ -441,6 +782,26 @@ public class InfinispanProducerTest extends InfinispanTestSupport { Object value = currentCache().get(KEY_ONE); assertEquals(value, null); } + + @Test + public void deletesExistingValueByKeyAsync() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:removeasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REMOVE_ASYNC); + } + }); + + Thread.sleep(100); + NotifyingFuture fut = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class); + assertEquals(fut.isDone(), Boolean.TRUE); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value, null); + } @Test public void clearsAllValues() throws Exception { @@ -527,6 +888,16 @@ public class InfinispanProducerTest extends InfinispanTestSupport { .to("infinispan://localhost?cacheContainer=#cacheContainer&command=CONTAINSVALUE"); from("direct:size") .to("infinispan://localhost?cacheContainer=#cacheContainer&command=SIZE"); + from("direct:putasync") + .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTASYNC"); + from("direct:putallasync") + .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTALLASYNC"); + from("direct:putifabsentasync") + .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTIFABSENTASYNC"); + from("direct:replaceasync") + .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REPLACEASYNC"); + from("direct:removeasync") + .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REMOVEASYNC"); } }; }