Repository: incubator-ignite Updated Branches: refs/heads/ignite-1245 [created] a246057c3
# Register client continuous listeners on node join Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a246057c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a246057c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a246057c Branch: refs/heads/ignite-1245 Commit: a246057c3240acaa37ae28873a217c771ef93b6a Parents: 36f7ba6 Author: sboikov <sboi...@gridgain.com> Authored: Thu Aug 13 13:05:43 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Aug 13 13:45:21 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 34 ++++- .../IgniteCacheContinuousQueryClientTest.java | 134 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 3 files changed, 166 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index daa9494..8c8e259 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -193,10 +193,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(routineId); if (snd.isClient()) { - Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id()); + Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(snd.id()); - if (infoMap != null) - infoMap.remove(msg.routineId()); + if (clientRouteMap != null) + clientRouteMap.remove(msg.routineId()); } } } @@ -370,6 +370,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { + UUID clientNodeId = entry.getKey(); + + Map<UUID, LocalRoutineInfo> clientRouteMap = entry.getValue(); + + for (Map.Entry<UUID, LocalRoutineInfo> e : clientRouteMap.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + try { + if (info.prjPred != null) + ctx.resource().injectGeneric(info.prjPred); + + if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { + if (registerHandler(clientNodeId, + routineId, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe, + false)) + info.hnd.onListenerRegistered(routineId, ctx); + } + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to register continuous handler.", err); + } + } + Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); if (map == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java new file mode 100644 index 0000000..7341dea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.event.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoins() throws Exception { + startGrids(2); + + client = true; + + Ignite clientNode = startGrid(3); + + client = false; + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clientNode.cache(null).query(qry); + + Ignite joined1 = startGrid(4); + + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + + joinedCache1.put(primaryKey(joinedCache1), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); + + joinedCache2.put(primaryKey(joinedCache2), 2); + + U.sleep(1000); + + assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private volatile CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 2d7d0ce..a3849d7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);