CAMEL-9685 - camel-infinispan : support continuous query in consumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9df244d2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9df244d2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9df244d2 Branch: refs/heads/master Commit: 9df244d2f7e2e0979f429e1db5448787ce6cca20 Parents: 7c6737b Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Mar 10 15:39:34 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Mar 11 12:07:46 2016 +0100 ---------------------------------------------------------------------- .../infinispan/InfinispanConstants.java | 3 + .../infinispan/InfinispanConsumer.java | 70 ++++++-- .../component/infinispan/InfinispanUtil.java | 4 + .../remote/InfinispanConsumerRemoteHandler.java | 3 +- .../remote/InfinispanRemoteOperation.java | 7 +- .../infinispan/InfinispanContinuousQueryIT.java | 171 +++++++++++++++++++ .../camel/component/infinispan/UserUtils.java | 7 + 7 files changed, 248 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java index d2a5f95..1a27fd7 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -50,4 +50,7 @@ public interface InfinispanConstants { String EVENT_DATA = "CamelInfinispanEventData"; String QUERY = "CamelInfinispanOperationQuery"; String QUERY_BUILDER = "CamelInfinispanQueryBuilder"; + + String CACHE_ENTRY_JOINING = "CacheEntryJoining"; + String CACHE_ENTRY_LEAVING = "CacheEntryLeaving"; } http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java index 90a5e3e..b4db422 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java @@ -20,11 +20,15 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.infinispan.embedded.InfinispanConsumerEmbeddedHandler; import org.apache.camel.component.infinispan.remote.InfinispanConsumerRemoteHandler; +import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation; import org.apache.camel.impl.DefaultConsumer; -import org.infinispan.Cache; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.Search; import org.infinispan.commons.api.BasicCache; import org.infinispan.commons.api.BasicCacheContainer; -import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.query.api.continuous.ContinuousQuery; +import org.infinispan.query.api.continuous.ContinuousQueryListener; +import org.infinispan.query.dsl.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +36,9 @@ public class InfinispanConsumer extends DefaultConsumer { private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class); private final InfinispanConfiguration configuration; private InfinispanEventListener listener; - private EmbeddedCacheManager cacheManager; + private InfinispanConsumerHandler consumerHandler; private BasicCache<Object, Object> cache; + private ContinuousQuery<Object, Object> continuousQuery; public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) { super(endpoint, processor); @@ -63,25 +68,44 @@ public class InfinispanConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { + super.doStart(); + BasicCacheContainer cacheContainer = configuration.getCacheContainer(); - String cacheName = configuration.getCacheName(); - cache = cacheName == null ? cacheContainer.getCache() : cacheContainer.getCache(cacheName); - if (InfinispanUtil.isEmbedded(cacheContainer)) { - listener = InfinispanConsumerEmbeddedHandler.INSTANCE.start(this); - } else if (InfinispanUtil.isRemote(cacheContainer)) { - listener = InfinispanConsumerRemoteHandler.INSTANCE.start(this); + cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName()); + + if (configuration.hasQueryBuilder()) { + if (InfinispanUtil.isRemote(cache)) { + RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache); + Query query = InfinispanRemoteOperation.buildQuery(configuration.getQueryBuilder(), remoteCache); + + continuousQuery = Search.getContinuousQuery(remoteCache); + continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName())); + } else { + throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")"); + } } else { - throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer); - } - super.doStart(); + if (InfinispanUtil.isEmbedded(cacheContainer)) { + consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE; + } else if (InfinispanUtil.isRemote(cacheContainer)) { + consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE; + } else { + throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer); + } + listener = consumerHandler.start(this); + } } @Override protected void doStop() throws Exception { - if (cacheManager != null) { - cacheManager.removeListener(listener); + if (continuousQuery != null) { + continuousQuery.removeAllListeners(); + } + + if (consumerHandler != null) { + consumerHandler.stop(this); } + super.doStop(); } @@ -96,4 +120,22 @@ public class InfinispanConsumer extends DefaultConsumer { public InfinispanConfiguration getConfiguration() { return configuration; } + + private class ContinuousQueryEventListener implements ContinuousQueryListener<Object, Object> { + private final String cacheName; + + public ContinuousQueryEventListener(String cacheName) { + this.cacheName = cacheName; + } + + @Override + public void resultJoining(Object key, Object value) { + processEvent(InfinispanConstants.CACHE_ENTRY_JOINING, false, cacheName, key, value); + } + + @Override + public void resultLeaving(Object key) { + processEvent(InfinispanConstants.CACHE_ENTRY_LEAVING, false, cacheName, key); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java index f12630f..95a5d04 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java @@ -83,4 +83,8 @@ public final class InfinispanUtil { public static boolean isInHeaderEmpty(Exchange exchange, String header) { return ObjectHelper.isEmpty(exchange.getIn().getHeader(header)); } + + public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) { + return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java index 80f44bc..c3ad05f 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java @@ -50,8 +50,7 @@ public final class InfinispanConsumerRemoteHandler implements InfinispanConsumer @Override public void stop(InfinispanConsumer consumer) { - RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache(); - remoteCache.removeClientListener(consumer.getListener()); + InfinispanUtil.asRemote(consumer.getCache()).removeClientListener(consumer.getListener()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java index 530a396..dd0f590 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java @@ -21,6 +21,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration; import org.apache.camel.component.infinispan.InfinispanConstants; import org.apache.camel.component.infinispan.InfinispanQueryBuilder; import org.apache.camel.component.infinispan.InfinispanUtil; +import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.Search; import org.infinispan.commons.api.BasicCache; import org.infinispan.query.dsl.Query; @@ -43,6 +44,10 @@ public final class InfinispanRemoteOperation { } public static Query buildQuery(InfinispanQueryBuilder queryBuilder, BasicCache<Object, Object> cache) { - return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(InfinispanUtil.asRemote(cache))) : null; + return buildQuery(queryBuilder, InfinispanUtil.asRemote(cache)); + } + + public static Query buildQuery(InfinispanQueryBuilder queryBuilder, RemoteCache<Object, Object> cache) { + return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(cache)) : null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java new file mode 100644 index 0000000..b591555 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.infinispan; + +import java.io.IOException; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; +import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller; +import org.infinispan.commons.util.Util; +import org.infinispan.protostream.FileDescriptorSource; +import org.infinispan.protostream.SerializationContext; +import org.infinispan.protostream.sampledomain.User; +import org.infinispan.protostream.sampledomain.marshallers.GenderMarshaller; +import org.infinispan.protostream.sampledomain.marshallers.UserMarshaller; +import org.infinispan.query.dsl.Query; +import org.infinispan.query.dsl.QueryFactory; +import org.infinispan.query.remote.client.MarshallerRegistration; +import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants; +import org.junit.Test; + +import static org.apache.camel.component.infinispan.UserUtils.CQ_USERS; +import static org.apache.camel.component.infinispan.UserUtils.createKey; + +public class InfinispanContinuousQueryIT extends CamelTestSupport { + + private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER = new InfinispanQueryBuilder() { + @Override + public Query build(QueryFactory<Query> queryFactory) { + return queryFactory.from(User.class) + .having("name").like("CQ%") + .toBuilder().build(); + } + }; + + private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_NO_MATCH = new InfinispanQueryBuilder() { + @Override + public Query build(QueryFactory<Query> queryFactory) { + return queryFactory.from(User.class) + .having("name").like("%TEST%") + .toBuilder().build(); + } + }; + + private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_ALL = new InfinispanQueryBuilder() { + @Override + public Query build(QueryFactory<Query> queryFactory) { + return queryFactory.from(User.class) + .having("name").like("%Q0%") + .toBuilder().build(); + } + }; + + private RemoteCacheManager manager; + private RemoteCache<Object, Object> cache; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("myCustomContainer", manager); + registry.bind("continuousQueryBuilder", CONTINUOUS_QUERY_BUILDER); + registry.bind("continuousQueryBuilderNoMatch", CONTINUOUS_QUERY_BUILDER_NO_MATCH); + registry.bind("continuousQueryBuilderAll", CONTINUOUS_QUERY_BUILDER_ALL); + + return registry; + } + + @Override + protected void doPreSetup() throws IOException { + ConfigurationBuilder builder = new ConfigurationBuilder() + .addServer() + .host("localhost") + .port(11222) + .marshaller(new ProtoStreamMarshaller()); + + manager = new RemoteCacheManager(builder.build()); + + RemoteCache<String, String> metadataCache = manager.getCache( + ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME); + metadataCache.put( + "sample_bank_account/bank.proto", + Util.read(InfinispanContinuousQueryIT.class.getResourceAsStream("/sample_bank_account/bank.proto"))); + + MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext(manager)); + + SerializationContext serCtx = ProtoStreamMarshaller.getSerializationContext(manager); + serCtx.registerProtoFiles(FileDescriptorSource.fromResources("/sample_bank_account/bank.proto")); + serCtx.registerMarshaller(new UserMarshaller()); + serCtx.registerMarshaller(new GenderMarshaller()); + + // pre-load data + cache = manager.getCache("remote_query"); + cache.clear(); + } + + @Test + public void continuousQuery() throws Exception { + MockEndpoint continuousQueryBuilderNoMatch = getMockEndpoint("mock:continuousQueryNoMatch"); + continuousQueryBuilderNoMatch.expectedMessageCount(0); + + MockEndpoint continuousQueryBuilderAll = getMockEndpoint("mock:continuousQueryAll"); + continuousQueryBuilderAll.expectedMessageCount(CQ_USERS.length * 2); + + MockEndpoint continuousQuery = getMockEndpoint("mock:continuousQuery"); + continuousQuery.expectedMessageCount(4); + + for (int i = 0; i < 4; i++) { + continuousQuery.message(i).outHeader(InfinispanConstants.KEY).isEqualTo(createKey(CQ_USERS[i % 2])); + continuousQuery.message(i).outHeader(InfinispanConstants.CACHE_NAME).isEqualTo(cache.getName()); + + if (i >= 2) { + continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_LEAVING); + continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNull(); + } else { + continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_JOINING); + continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNotNull(); + continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isInstanceOf(User.class); + } + } + + for (final User user : CQ_USERS) { + cache.put(createKey(user), user); + } + + assertEquals(CQ_USERS.length, cache.size()); + + for (final User user : CQ_USERS) { + cache.remove(createKey(user)); + } + + assertTrue(cache.isEmpty()); + + continuousQuery.assertIsSatisfied(); + continuousQueryBuilderNoMatch.assertIsSatisfied(); + continuousQueryBuilderAll.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilder") + .to("mock:continuousQuery"); + from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderNoMatch") + .to("mock:continuousQueryNoMatch"); + from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderAll") + .to("mock:continuousQueryAll"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java index 4126b83..df2ba65 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java @@ -25,6 +25,13 @@ public final class UserUtils { createUser("nameA", "surnameB"), createUser("nameB", "surnameB")}; + public static final User[] CQ_USERS = new User[]{ + createUser("CQ01", "surname01"), + createUser("CQ02", "surname01"), + createUser("NQ03", "surname03"), + createUser("NQ04", "surname04") + }; + private UserUtils() { }