Repository: camel Updated Branches: refs/heads/master 4121bd679 -> 8c415c8f3
CAMEL-9684 - camel-infinispan : add support for setting query builder in InifispanConfiguration Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c415c8f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c415c8f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c415c8f Branch: refs/heads/master Commit: 8c415c8f36eb9c6dc72d3d00d37c7465b5b27a18 Parents: 4121bd6 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Mar 9 12:35:36 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 9 14:11:34 2016 +0100 ---------------------------------------------------------------------- camel-core/src/main/docs/mock.adoc | 8 +- camel-core/src/main/docs/properties.adoc | 3 + components/camel-infinispan/pom.xml | 6 + .../infinispan/InfinispanConfiguration.java | 24 +++- .../infinispan/InfinispanOperation.java | 126 +++++++++--------- .../component/infinispan/InfinispanUtil.java | 15 +++ .../InfinispanConsumerEmbeddedHandler.java | 2 +- .../remote/InfinispanConsumerRemoteHandler.java | 9 +- .../remote/InfinispanRemoteOperation.java | 22 +++- .../InfinispanRemoteQueryProducerIT.java | 132 +++++++++++-------- parent/pom.xml | 1 + 11 files changed, 211 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/camel-core/src/main/docs/mock.adoc ---------------------------------------------------------------------- diff --git a/camel-core/src/main/docs/mock.adoc b/camel-core/src/main/docs/mock.adoc index f3f9c53..5597d78 100644 --- a/camel-core/src/main/docs/mock.adoc +++ b/camel-core/src/main/docs/mock.adoc @@ -77,19 +77,15 @@ Options + // component options: START The Mock component has no options. - - -[width="100%",cols="2s,1m,8",options="header"] -|======================================================================= -| Name | Java Type | Description -|======================================================================= // component options: END + // endpoint options: START The Mock component supports 12 endpoint options which are listed below: http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/camel-core/src/main/docs/properties.adoc ---------------------------------------------------------------------- diff --git a/camel-core/src/main/docs/properties.adoc b/camel-core/src/main/docs/properties.adoc index 23da72a..86ebd6f 100644 --- a/camel-core/src/main/docs/properties.adoc +++ b/camel-core/src/main/docs/properties.adoc @@ -21,10 +21,12 @@ Options + // component options: START The Properties component supports 15 options which are listed below. + [width="100%",cols="2s,1m,8",options="header"] |======================================================================= | Name | Java Type | Description @@ -49,6 +51,7 @@ The Properties component supports 15 options which are listed below. + // endpoint options: START The Properties component supports 7 endpoint options which are listed below: http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml index 36bc87e..e5147d6 100644 --- a/components/camel-infinispan/pom.xml +++ b/components/camel-infinispan/pom.xml @@ -105,6 +105,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + <version>${log4j2-version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java index 6417d99..e60fed0 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java @@ -24,6 +24,7 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; +import org.apache.camel.util.ObjectHelper; import org.infinispan.commons.api.BasicCacheContainer; @UriParams @@ -45,6 +46,8 @@ public class InfinispanConfiguration { private InfinispanCustomListener customListener; @UriParam(label = "consumer", defaultValue = "false") private boolean clustered; + @UriParam(label = "advanced") + private InfinispanQueryBuilder queryBuilder; public String getCommand() { return command; @@ -57,6 +60,10 @@ public class InfinispanConfiguration { this.command = command; } + public boolean hasCommand() { + return ObjectHelper.isNotEmpty(command); + } + /** * Specifies the host of the cache on Infinispan instance */ @@ -149,7 +156,22 @@ public class InfinispanConfiguration { this.customListener = customListener; } - public boolean isCustom() { + public boolean hasCustomListener() { return customListener != null; } + + public InfinispanQueryBuilder getQueryBuilder() { + return queryBuilder; + } + + /** + * Specifies the query builder. + */ + public void setQueryBuilder(InfinispanQueryBuilder queryBuilder) { + this.queryBuilder = queryBuilder; + } + + public boolean hasQueryBuilder() { + return queryBuilder != null; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java index c51cc6f..c54fd54 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java @@ -28,6 +28,8 @@ import org.infinispan.query.dsl.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.infinispan.InfinispanUtil.isInHeaderEmpty; + public final class InfinispanOperation { private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class); @@ -36,13 +38,13 @@ public final class InfinispanOperation { public static void process(Exchange exchange, InfinispanConfiguration configuration, BasicCache<Object, Object> cache) { Operation operation = getOperation(exchange, configuration); - operation.execute(cache, exchange); + operation.execute(configuration, cache, exchange); } private static Operation getOperation(Exchange exchange, InfinispanConfiguration configuration) { String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class); if (operation == null) { - if (configuration.getCommand() != null) { + if (configuration.hasCommand()) { operation = InfinispanConstants.OPERATION + configuration.getCommand(); } else { operation = InfinispanConstants.PUT; @@ -55,16 +57,16 @@ public final class InfinispanOperation { private enum Operation { PUT { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { Object result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -78,16 +80,16 @@ public final class InfinispanOperation { } }, PUTASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { NotifyingFuture result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -101,12 +103,12 @@ public final class InfinispanOperation { } }, PUTALL { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -119,13 +121,13 @@ public final class InfinispanOperation { } }, PUTALLASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { NotifyingFuture result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -139,16 +141,16 @@ public final class InfinispanOperation { } }, PUTIFABSENT { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { Object result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -162,16 +164,16 @@ public final class InfinispanOperation { } }, PUTIFABSENTASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { NotifyingFuture result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); @@ -185,27 +187,25 @@ public final class InfinispanOperation { } }, GET { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Object result = cache.get(getKey(exchange)); - setResult(result, exchange); + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + setResult(cache.get(getKey(exchange)), exchange); } }, CONTAINSKEY { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Object result = cache.containsKey(getKey(exchange)); - setResult(result, exchange); + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + setResult(cache.containsKey(getKey(exchange)), exchange); } }, CONTAINSVALUE { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { Object result = cache.containsValue(getValue(exchange)); setResult(result, exchange); } }, REMOVE { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { Object result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } if (ObjectHelper.isEmpty(getValue(exchange))) { @@ -217,9 +217,9 @@ public final class InfinispanOperation { } }, REMOVEASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { NotifyingFuture result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } if (ObjectHelper.isEmpty(getValue(exchange))) { @@ -231,16 +231,16 @@ public final class InfinispanOperation { } }, REPLACE { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { Object result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); if (ObjectHelper.isEmpty(getOldValue(exchange))) { @@ -266,16 +266,16 @@ public final class InfinispanOperation { } }, REPLACEASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { NotifyingFuture result; - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) { cache = InfinispanUtil.ignoreReturnValuesCache(cache); } - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) { long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class); String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class); - if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) - && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { + if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME) + && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); if (ObjectHelper.isEmpty(getOldValue(exchange))) { @@ -301,26 +301,23 @@ public final class InfinispanOperation { } }, SIZE { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Object result = cache.size(); - setResult(result, exchange); + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + setResult(cache.size(), exchange); } }, CLEAR { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { cache.clear(); } }, CLEARASYNC { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - NotifyingFuture result; - result = cache.clearAsync(); - setResult(result, exchange); + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + setResult(cache.clearAsync(), exchange); } }, QUERY { @Override - void execute(BasicCache<Object, Object> cache, Exchange exchange) { - Query query = getQuery(cache, exchange); + void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + Query query = getQuery(configuration, cache, exchange); if (query == null) { return; } @@ -328,7 +325,6 @@ public final class InfinispanOperation { } }; - void setResult(Object result, Exchange exchange) { exchange.getIn().setHeader(InfinispanConstants.RESULT, result); } @@ -349,16 +345,16 @@ public final class InfinispanOperation { return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP); } - Query getQuery(BasicCache<Object, Object> cache, Exchange exchange) { + Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { if (InfinispanUtil.isRemote(cache)) { - return InfinispanRemoteOperation.buildQuery(cache, exchange); + return InfinispanRemoteOperation.buildQuery(configuration, cache, exchange); } else { return null; } } - abstract void execute(BasicCache<Object, Object> cache, Exchange exchange); + abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java index 20819ae..f12630f 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.infinispan; +import org.apache.camel.Exchange; +import org.apache.camel.util.ObjectHelper; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; @@ -52,6 +54,11 @@ public final class InfinispanUtil { } } + @SuppressWarnings("unchecked") + public static RemoteCacheManager asRemote(BasicCacheContainer container) { + return RemoteCacheManager.class.cast(container); + } + public static <K, V> boolean isRemote(BasicCache<K, V> cache) { try { return cache instanceof RemoteCache; @@ -60,6 +67,11 @@ public final class InfinispanUtil { } } + @SuppressWarnings("unchecked") + public static <K, V> RemoteCache<K, V> asRemote(BasicCache<K, V> cache) { + return RemoteCache.class.cast(cache); + } + public static <K, V> BasicCache<K, V> ignoreReturnValuesCache(BasicCache<K, V> cache) { if (isEmbedded(cache)) { return ((Cache<K, V>) cache).getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES); @@ -68,4 +80,7 @@ public final class InfinispanUtil { } } + public static boolean isInHeaderEmpty(Exchange exchange, String header) { + return ObjectHelper.isEmpty(exchange.getIn().getHeader(header)); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java index d7844e6..712dec8 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java @@ -34,7 +34,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache(); InfinispanConfiguration configuration = consumer.getConfiguration(); InfinispanEventListener listener; - if (configuration.isCustom()) { + if (configuration.hasCustomListener()) { listener = configuration.getCustomListener(); ((InfinispanCustomListener)listener).setInfinispanConsumer(consumer); } else if (configuration.isClustered()) { http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java index 48ec911..80f44bc 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java @@ -19,8 +19,8 @@ package org.apache.camel.component.infinispan.remote; import org.apache.camel.component.infinispan.InfinispanConfiguration; import org.apache.camel.component.infinispan.InfinispanConsumer; import org.apache.camel.component.infinispan.InfinispanConsumerHandler; -import org.apache.camel.component.infinispan.InfinispanCustomListener; import org.apache.camel.component.infinispan.InfinispanEventListener; +import org.apache.camel.component.infinispan.InfinispanUtil; import org.infinispan.client.hotrod.RemoteCache; public final class InfinispanConsumerRemoteHandler implements InfinispanConsumerHandler { @@ -34,19 +34,18 @@ public final class InfinispanConsumerRemoteHandler implements InfinispanConsumer if (consumer.getConfiguration().isSync()) { throw new UnsupportedOperationException("Sync listeners not supported for remote caches."); } - RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache(); + RemoteCache<?, ?> remoteCache = InfinispanUtil.asRemote(consumer.getCache()); InfinispanConfiguration configuration = consumer.getConfiguration(); InfinispanEventListener listener; - if (configuration.isCustom()) { + if (configuration.hasCustomListener()) { listener = configuration.getCustomListener(); - ((InfinispanCustomListener)listener).setInfinispanConsumer(consumer); + listener.setInfinispanConsumer(consumer); } else { listener = new InfinispanRemoteEventListener(consumer, configuration.getEventTypes()); } remoteCache.addClientListener(listener); listener.setCacheName(remoteCache.getName()); return listener; - } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java index 38c1856..530a396 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java @@ -17,9 +17,10 @@ package org.apache.camel.component.infinispan.remote; import org.apache.camel.Exchange; +import org.apache.camel.component.infinispan.InfinispanConfiguration; import org.apache.camel.component.infinispan.InfinispanConstants; import org.apache.camel.component.infinispan.InfinispanQueryBuilder; -import org.infinispan.client.hotrod.RemoteCache; +import org.apache.camel.component.infinispan.InfinispanUtil; import org.infinispan.client.hotrod.Search; import org.infinispan.commons.api.BasicCache; import org.infinispan.query.dsl.Query; @@ -28,13 +29,20 @@ public final class InfinispanRemoteOperation { private InfinispanRemoteOperation() { } - public static Query buildQuery(BasicCache<Object, Object> cache, Exchange exchange) { - InfinispanQueryBuilder queryBuilder = (InfinispanQueryBuilder) exchange - .getIn().getHeader(InfinispanConstants.QUERY_BUILDER); + public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) { + InfinispanQueryBuilder queryBuilder = exchange.getIn().getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class); if (queryBuilder == null) { - return null; + queryBuilder = configuration.getQueryBuilder(); } - RemoteCache<Object, Object> remoteCache = (RemoteCache<Object, Object>) cache; - return queryBuilder.build(Search.getQueryFactory(remoteCache)); + + return buildQuery(queryBuilder, cache); + } + + public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache) { + return buildQuery(configuration.getQueryBuilder(), cache); + } + + public static Query buildQuery(InfinispanQueryBuilder queryBuilder, BasicCache<Object, Object> cache) { + return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(InfinispanUtil.asRemote(cache))) : null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteQueryProducerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteQueryProducerIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteQueryProducerIT.java index cd19aed..77fea92 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteQueryProducerIT.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteQueryProducerIT.java @@ -18,8 +18,8 @@ package org.apache.camel.component.infinispan; import java.io.IOException; import java.util.List; + import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.JndiRegistry; @@ -40,24 +40,43 @@ import org.infinispan.query.remote.client.MarshallerRegistration; import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants; import org.junit.Test; -import static org.apache.camel.component.infinispan.InfinispanConstants.KEY; import static org.apache.camel.component.infinispan.InfinispanConstants.OPERATION; import static org.apache.camel.component.infinispan.InfinispanConstants.QUERY; import static org.apache.camel.component.infinispan.InfinispanConstants.QUERY_BUILDER; import static org.apache.camel.component.infinispan.InfinispanConstants.RESULT; -import static org.apache.camel.component.infinispan.InfinispanConstants.VALUE; import static org.apache.camel.component.infinispan.UserUtils.USERS; import static org.apache.camel.component.infinispan.UserUtils.createKey; import static org.apache.camel.component.infinispan.UserUtils.hasUser; public class InfinispanRemoteQueryProducerIT extends CamelTestSupport { + private static final InfinispanQueryBuilder NO_RESULT_QUERY_BUILDER = new InfinispanQueryBuilder() { + @Override + public Query build(QueryFactory<Query> queryFactory) { + return queryFactory.from(User.class) + .having("name").like("%abc%") + .toBuilder().build(); + } + }; + + private static final InfinispanQueryBuilder WITH_RESULT_QUERY_BUILDER = new InfinispanQueryBuilder() { + @Override + public Query build(QueryFactory<Query> queryFactory) { + return queryFactory.from(User.class) + .having("name").like("%A") + .toBuilder().build(); + } + }; + private RemoteCacheManager manager; @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); registry.bind("myCustomContainer", manager); + registry.bind("noResultQueryBuilder", NO_RESULT_QUERY_BUILDER); + registry.bind("withResultQueryBuilder", WITH_RESULT_QUERY_BUILDER); + return registry; } @@ -66,17 +85,23 @@ public class InfinispanRemoteQueryProducerIT extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() { - from("direct:start").to( - "infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query"); + from("direct:start") + .to("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query"); + from("direct:noQueryResults") + .to("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#noResultQueryBuilder"); + from("direct:queryWithResults") + .to("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#withResultQueryBuilder"); } }; } @Override protected void doPreSetup() throws IOException { - ConfigurationBuilder builder = new ConfigurationBuilder().addServer() - .host("localhost").port(11222) - .marshaller(new ProtoStreamMarshaller()); + ConfigurationBuilder builder = new ConfigurationBuilder() + .addServer() + .host("localhost") + .port(11222) + .marshaller(new ProtoStreamMarshaller()); manager = new RemoteCacheManager(builder.build()); @@ -99,18 +124,18 @@ public class InfinispanRemoteQueryProducerIT extends CamelTestSupport { @Override protected void doPostSetup() throws Exception { - /* Preload data. */ + // pre-load data + RemoteCache<Object, Object> cache = manager.getCache("remote_query"); + assertNotNull(cache); + + cache.clear(); + assertTrue(cache.isEmpty()); + for (final User user : USERS) { - Exchange request = template.request("direct:start", - new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - Message in = exchange.getIn(); - in.setHeader(KEY, createKey(user)); - in.setHeader(VALUE, user); - } - }); - assertNull(request.getException()); + String key = createKey(user); + cache.put(key, user); + + assertTrue(cache.containsKey(key)); } } @@ -124,57 +149,60 @@ public class InfinispanRemoteQueryProducerIT extends CamelTestSupport { }); assertNull(request.getException()); - @SuppressWarnings("unchecked") - List<User> queryResult = (List<User>) request.getIn().getHeader(RESULT); + List<User> queryResult = request.getIn().getHeader(RESULT, List.class); assertNull(queryResult); } @Test public void producerQueryWithoutResult() throws Exception { - Exchange request = template.request("direct:start", new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(OPERATION, QUERY); - exchange.getIn().setHeader(QUERY_BUILDER, - new InfinispanQueryBuilder() { - public Query build(QueryFactory<Query> queryFactory) { - return queryFactory.from(User.class) - .having("name").like("%abc%") - .toBuilder().build(); - } - }); - } - }); + producerQueryWithoutResult("direct:start", NO_RESULT_QUERY_BUILDER); + } + + @Test + public void producerQueryWithoutResultAndQueryBuilderFromConfig() throws Exception { + producerQueryWithoutResult("direct:noQueryResults", null); + } + + private void producerQueryWithoutResult(String endpoint, final InfinispanQueryBuilder builder) throws Exception { + Exchange request = template.request(endpoint, createQueryProcessor(builder)); + assertNull(request.getException()); - @SuppressWarnings("unchecked") - List<User> queryResult = (List<User>) request.getIn().getHeader(RESULT); + List<User> queryResult = request.getIn().getHeader(RESULT, List.class); assertNotNull(queryResult); assertEquals(0, queryResult.size()); } @Test public void producerQueryWithResult() throws Exception { - Exchange request = template.request("direct:start", new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - Message in = exchange.getIn(); - in.setHeader(OPERATION, QUERY); - in.setHeader(QUERY_BUILDER, new InfinispanQueryBuilder() { - public Query build(QueryFactory<Query> queryFactory) { - return queryFactory.from(User.class).having("name") - .like("%A").toBuilder().build(); - } - }); - } - }); + producerQueryWithResult("direct:start", WITH_RESULT_QUERY_BUILDER); + } + + @Test + public void producerQueryWithResultAndQueryBuilderFromConfig() throws Exception { + producerQueryWithResult("direct:queryWithResults", null); + } + + private void producerQueryWithResult(String endpoint, final InfinispanQueryBuilder builder) throws Exception { + Exchange request = template.request(endpoint, createQueryProcessor(builder)); assertNull(request.getException()); - @SuppressWarnings("unchecked") - List<User> queryResult = (List<User>) request.getIn().getHeader(RESULT); + List<User> queryResult = request.getIn().getHeader(RESULT, List.class); assertNotNull(queryResult); assertEquals(2, queryResult.size()); assertTrue(hasUser(queryResult, "nameA", "surnameA")); assertTrue(hasUser(queryResult, "nameA", "surnameB")); } + + private Processor createQueryProcessor(final InfinispanQueryBuilder builder) { + return new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(OPERATION, QUERY); + if (builder != null) { + exchange.getIn().setHeader(QUERY_BUILDER, builder); + } + } + }; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c415c8f/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index f4aa43e..b4662a2 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -359,6 +359,7 @@ <!-- virtual dependency only used by Eclipse m2e --> <lifecycle-mapping-version>1.0.0</lifecycle-mapping-version> <log4j-version>1.2.17</log4j-version> + <log4j2-version>2.5</log4j2-version> <logback-version>1.1.5</logback-version> <lucene3-bundle-version>3.6.0_1</lucene3-bundle-version> <lucene3-version>3.6.0</lucene3-version>