Repository: camel Updated Branches: refs/heads/master 381d53657 -> bc07f3168
Resolve CAMEL-8681 Camel-Infinispan: use Lifespan and Max Idle Time in the implementation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/105525e5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/105525e5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/105525e5 Branch: refs/heads/master Commit: 105525e5a9c42c0706acdfff667cd50671b4fb0b Parents: 381d536 Author: ancosen <anco...@gmail.com> Authored: Tue Apr 21 22:13:45 2015 +0200 Committer: ancosen <anco...@gmail.com> Committed: Tue Apr 21 22:13:45 2015 +0200 ---------------------------------------------------------------------- .../infinispan/InfinispanConstants.java | 6 + .../infinispan/InfinispanOperation.java | 59 +++++- .../infinispan/InfinispanProducerTest.java | 181 +++++++++++++++++-- 3 files changed, 226 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java index 38221fa..6f69e7f 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -22,14 +22,20 @@ public interface InfinispanConstants { String CACHE_NAME = "CamelInfinispanCacheName"; String KEY = "CamelInfinispanKey"; String VALUE = "CamelInfinispanValue"; + String MAP = "CamelInfinispanMap"; String OPERATION = "CamelInfinispanOperation"; String PUT = "CamelInfinispanOperationPut"; String PUT_IF_ABSENT = "CamelInfinispanOperationPutIfAbsent"; String GET = "CamelInfinispanOperationGet"; String CONTAINS_KEY = "CamelInfinispanOperationContainsKey"; String CONTAINS_VALUE = "CamelInfinispanOperationContainsValue"; + String PUT_ALL = "CamelInfinispanOperationPutAll"; String REMOVE = "CamelInfinispanOperationRemove"; String REPLACE = "CamelInfinispanOperationReplace"; String CLEAR = "CamelInfinispanOperationClear"; String RESULT = "CamelInfinispanOperationResult"; + String LIFESPAN_TIME = "CamelInfinispanLifespanTime"; + String LIFESPAN_TIME_UNIT = "CamelInfinispanTimeUnit"; + String MAX_IDLE_TIME = "CamelInfinispanMaxIdleTime"; + String MAX_IDLE_TIME_UNIT = "CamelInfinispanMaxIdleTimeUnit"; } http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java index 023062d..3fb0a88 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java @@ -16,7 +16,11 @@ */ package org.apache.camel.component.infinispan; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.camel.Exchange; +import org.apache.camel.util.ObjectHelper; import org.infinispan.commons.api.BasicCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,13 +57,60 @@ public class InfinispanOperation { PUT { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Object result = cache.put(getKey(exchange), getValue(exchange)); + Object result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = (long) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME); + String timeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = (long) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME); + String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT); + result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.put(getKey(exchange), getValue(exchange)); + } setResult(result, exchange); } + }, PUTALL { + @Override + void execute(BasicCache<Object, Object> cache, Exchange exchange) { + Object result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = (long) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME); + String timeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = (long) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME); + String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT); + cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + cache.putAll(getMap(exchange)); + } + } }, PUTIFABSENT { @Override void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Object result = cache.putIfAbsent(getKey(exchange), getValue(exchange)); + Object result; + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + long lifespan = (long) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME); + String timeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT); + if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) + && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + long maxIdle = (long) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME); + String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT); + result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } + } else { + result = cache.putIfAbsent(getKey(exchange), getValue(exchange)); + } setResult(result, exchange); } }, GET { @@ -110,6 +161,10 @@ public class InfinispanOperation { Object getValue(Exchange exchange) { return exchange.getIn().getHeader(InfinispanConstants.VALUE); } + + Map<? extends Object, ? extends Object> getMap(Exchange exchange) { + return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP); + } abstract void execute(BasicCache<Object, Object> cache, Exchange exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/105525e5/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java index 4061864..26d6d9e 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java @@ -16,18 +16,22 @@ */ package org.apache.camel.component.infinispan; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.core.Is.is; - public class InfinispanProducerTest extends InfinispanTestSupport { private static final String COMMAND_VALUE = "commandValue"; private static final String COMMAND_KEY = "commandKey1"; + private static final long LIFESPAN_TIME = 5; + private static final long LIFESPAN_FOR_MAX_IDLE = -1; + private static final long MAX_IDLE_TIME = 3; @Test public void keyAndValueArePublishedWithDefaultOperation() throws Exception { @@ -40,7 +44,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { }); Object value = currentCache().get(KEY_ONE); - assertThat(value.toString(), is(VALUE_ONE)); + assertEquals(value.toString(), VALUE_ONE); } @Test @@ -55,7 +59,110 @@ public class InfinispanProducerTest extends InfinispanTestSupport { }); Object value = currentCache().get(KEY_ONE); - assertThat(value.toString(), is(VALUE_ONE)); + assertEquals(value.toString(), VALUE_ONE); + } + + @Test + public void publishMapNormal() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL); + } + }); + + assertEquals(currentCache().size(), 2); + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + value = currentCache().get(KEY_TWO); + assertEquals(value.toString(), VALUE_TWO); + } + + @Test + public void publishMapWithLifespan() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(currentCache().size(), 2); + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + value = currentCache().get(KEY_TWO); + assertEquals(value.toString(), VALUE_TWO); + + Thread.sleep(LIFESPAN_TIME * 1000); + + Exchange exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO); + } + }); + resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void publishMapWithLifespanAndMaxIdleTime() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put(KEY_ONE, VALUE_ONE); + map.put(KEY_TWO, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.MAP, map); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(currentCache().size(), 2); + Object value = currentCache().get(KEY_ONE); + + Thread.sleep(10000); + + Exchange exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO); + } + }); + resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); } @Test @@ -72,7 +179,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { }); Object value = currentCache().get(KEY_ONE); - assertThat(value.toString(), is(VALUE_ONE)); + assertEquals(value.toString(), VALUE_ONE); assertEquals(currentCache().size(), 1); } @@ -90,7 +197,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { }); Object value = currentCache().get(KEY_TWO); - assertThat(value.toString(), is(VALUE_TWO)); + assertEquals(value.toString(), VALUE_TWO); assertEquals(currentCache().size(), 2); } @@ -106,7 +213,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), is(false)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), false); } @Test @@ -121,7 +228,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), is(true)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); } @Test @@ -136,7 +243,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), is(false)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), false); } @Test @@ -151,7 +258,45 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), is(true)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + } + + @Test + public void publishKeyAndValueWithLifespan() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT); + } + }); + + Object value = currentCache().get(KEY_ONE); + assertEquals(value.toString(), VALUE_ONE); + + Exchange exchange; + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(VALUE_ONE, resultGet); + + Thread.sleep(LIFESPAN_TIME * 1000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); } @Test @@ -167,7 +312,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is("existing value")); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), "existing value"); } @Test @@ -182,7 +327,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is(VALUE_ONE)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); } @Test @@ -198,7 +343,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is(VALUE_ONE)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); } @@ -214,16 +359,16 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is(VALUE_ONE)); + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE); Object value = currentCache().get(KEY_ONE); - assertThat(value, is(nullValue())); + assertEquals(value, null); } @Test public void clearsAllValues() throws Exception { currentCache().put(KEY_ONE, VALUE_ONE); - assertThat(currentCache().isEmpty(), is(false)); + assertEquals(currentCache().isEmpty(), false); template.send("direct:start", new Processor() { @Override @@ -232,7 +377,7 @@ public class InfinispanProducerTest extends InfinispanTestSupport { } }); - assertThat(currentCache().isEmpty(), is(true)); + assertEquals(currentCache().isEmpty(), true); } @Test