# Register client continuous listeners on node join
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35e3e4e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35e3e4e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35e3e4e0 Branch: refs/heads/ignite-843 Commit: 35e3e4e048fa34b5b23ebd0ae235424f3e3492d9 Parents: aed83af Author: sboikov <sboi...@gridgain.com> Authored: Thu Aug 13 17:37:00 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Aug 13 17:37:00 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 44 ++++++++++++++++---- .../IgniteCacheContinuousQueryClientTest.java | 33 ++++++++++++--- .../IgniteCacheQuerySelfTestSuite.java | 1 + scripts/git-format-patch.sh | 2 +- 4 files changed, 66 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index daa9494..5f1c4bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -193,10 +193,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(routineId); if (snd.isClient()) { - Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id()); + Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id()); - if (infoMap != null) - infoMap.remove(msg.routineId()); + if (clientRoutineMap != null) + clientRoutineMap.remove(msg.routineId()); } } } @@ -370,6 +370,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { + UUID clientNodeId = entry.getKey(); + + Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue(); + + for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + try { + if (info.prjPred != null) + ctx.resource().injectGeneric(info.prjPred); + + if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { + if (registerHandler(clientNodeId, + routineId, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe, + false)) + info.hnd.onListenerRegistered(routineId, ctx); + } + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to register continuous handler.", err); + } + } + Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); if (map == null) { @@ -723,17 +751,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } if (node.isClient()) { - Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(node.id()); + Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id()); - if (clientRouteMap == null) { - clientRouteMap = new HashMap<>(); + if (clientRoutineMap == null) { + clientRoutineMap = new HashMap<>(); - Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRouteMap); + Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRoutineMap); assert old == null; } - clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), + clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(), data.interval(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java index bb413a0..d66d1d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; @@ -38,7 +40,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; */ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest { /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ private boolean client; @@ -47,6 +49,8 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -60,6 +64,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** * @throws Exception If failed. */ @@ -80,15 +91,27 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest QueryCursor<?> cur = clientNode.cache(null).query(qry); - Ignite joined = startGrid(4); + Ignite joined1 = startGrid(4); - IgniteCache<Object, Object> joinedCache = joined.cache(null); + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); - joinedCache.put(primaryKey(joinedCache), 1); + joinedCache1.put(primaryKey(joinedCache1), 1); assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); cur.close(); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); + + joinedCache2.put(primaryKey(joinedCache2), 2); + + U.sleep(1000); + + assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); } /** @@ -96,7 +119,7 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest */ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { /** */ - private final CountDownLatch latch = new CountDownLatch(1); + private volatile CountDownLatch latch = new CountDownLatch(1); /** */ @LoggerResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 2d7d0ce..a3849d7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/scripts/git-format-patch.sh ---------------------------------------------------------------------- diff --git a/scripts/git-format-patch.sh b/scripts/git-format-patch.sh index b11c73d..83aee3e 100755 --- a/scripts/git-format-patch.sh +++ b/scripts/git-format-patch.sh @@ -20,7 +20,7 @@ # Git patch-file maker. # echo 'Usage: scripts/git-format-patch.sh [-ih|--ignitehome <path>] [-idb|--ignitedefbranch <branch-name>] [-ph|--patchhome <path>]' -echo 'It is a script to create patch between Current branch (branch with changes) and Default branche. The script is safe and do not broke or lose your changes.' +echo 'It is a script to create patch between Current branch (branch with changes) and Default branch. The script is safe and does not break or lose your changes.' echo "It should be called from IGNITE_HOME directory." echo "Patch will be created at PATCHES_HOME (= IGNITE_HOME, by default) between Default branch (IGNITE_DEFAULT_BRANCH) and Current branch." echo "Note: you can use ${IGNITE_HOME}/scripts/git-patch-prop-local.sh to set your own local properties (to rewrite settings at git-patch-prop-local.sh). "