# Register client continuous listeners on node join

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35e3e4e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35e3e4e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35e3e4e0

Branch: refs/heads/ignite-843
Commit: 35e3e4e048fa34b5b23ebd0ae235424f3e3492d9
Parents: aed83af
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Aug 13 17:37:00 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Aug 13 17:37:00 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 44 ++++++++++++++++----
 .../IgniteCacheContinuousQueryClientTest.java   | 33 ++++++++++++---
 .../IgniteCacheQuerySelfTestSuite.java          |  1 +
 scripts/git-format-patch.sh                     |  2 +-
 4 files changed, 66 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..5f1c4bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -193,10 +193,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                         unregisterRemote(routineId);
 
                         if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> infoMap = 
clientInfos.get(snd.id());
+                            Map<UUID, LocalRoutineInfo> clientRoutineMap = 
clientInfos.get(snd.id());
 
-                            if (infoMap != null)
-                                infoMap.remove(msg.routineId());
+                            if (clientRoutineMap != null)
+                                clientRoutineMap.remove(msg.routineId());
                         }
                     }
                 }
@@ -370,6 +370,34 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             }
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : 
data.clientInfos.entrySet()) {
+                UUID clientNodeId = entry.getKey();
+
+                Map<UUID, LocalRoutineInfo> clientRoutineMap = 
entry.getValue();
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e : 
clientRoutineMap.entrySet()) {
+                    UUID routineId = e.getKey();
+                    LocalRoutineInfo info = e.getValue();
+
+                    try {
+                        if (info.prjPred != null)
+                            ctx.resource().injectGeneric(info.prjPred);
+
+                        if (info.prjPred == null || 
info.prjPred.apply(ctx.discovery().localNode())) {
+                            if (registerHandler(clientNodeId,
+                                routineId,
+                                info.hnd,
+                                info.bufSize,
+                                info.interval,
+                                info.autoUnsubscribe,
+                                false))
+                                info.hnd.onListenerRegistered(routineId, ctx);
+                        }
+                    }
+                    catch (IgniteCheckedException err) {
+                        U.error(log, "Failed to register continuous handler.", 
err);
+                    }
+                }
+
                 Map<UUID, LocalRoutineInfo> map = 
clientInfos.get(entry.getKey());
 
                 if (map == null) {
@@ -723,17 +751,17 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         }
 
         if (node.isClient()) {
-            Map<UUID, LocalRoutineInfo> clientRouteMap = 
clientInfos.get(node.id());
+            Map<UUID, LocalRoutineInfo> clientRoutineMap = 
clientInfos.get(node.id());
 
-            if (clientRouteMap == null) {
-                clientRouteMap = new HashMap<>();
+            if (clientRoutineMap == null) {
+                clientRoutineMap = new HashMap<>();
 
-                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), 
clientRouteMap);
+                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), 
clientRoutineMap);
 
                 assert old == null;
             }
 
-            clientRouteMap.put(routineId, new 
LocalRoutineInfo(data.projectionPredicate(),
+            clientRoutineMap.put(routineId, new 
LocalRoutineInfo(data.projectionPredicate(),
                 hnd,
                 data.bufferSize(),
                 data.interval(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index bb413a0..d66d1d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -20,7 +20,9 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
@@ -38,7 +40,7 @@ import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  */
 public class IgniteCacheContinuousQueryClientTest extends 
GridCommonAbstractTest {
     /** */
-    protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
     /** */
     private boolean client;
@@ -47,6 +49,8 @@ public class IgniteCacheContinuousQueryClientTest extends 
GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -60,6 +64,13 @@ public class IgniteCacheContinuousQueryClientTest extends 
GridCommonAbstractTest
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -80,15 +91,27 @@ public class IgniteCacheContinuousQueryClientTest extends 
GridCommonAbstractTest
 
         QueryCursor<?> cur = clientNode.cache(null).query(qry);
 
-        Ignite joined = startGrid(4);
+        Ignite joined1 = startGrid(4);
 
-        IgniteCache<Object, Object> joinedCache = joined.cache(null);
+        IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
 
-        joinedCache.put(primaryKey(joinedCache), 1);
+        joinedCache1.put(primaryKey(joinedCache1), 1);
 
         assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
 
         cur.close();
+
+        lsnr.latch = new CountDownLatch(1);
+
+        Ignite joined2 = startGrid(5);
+
+        IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
+
+        joinedCache2.put(primaryKey(joinedCache2), 2);
+
+        U.sleep(1000);
+
+        assertEquals("Unexpected event received.", 1, lsnr.latch.getCount());
     }
 
     /**
@@ -96,7 +119,7 @@ public class IgniteCacheContinuousQueryClientTest extends 
GridCommonAbstractTest
      */
     private static class CacheEventListener implements 
CacheEntryUpdatedListener<Object, Object> {
         /** */
-        private final CountDownLatch latch = new CountDownLatch(1);
+        private volatile CountDownLatch latch = new CountDownLatch(1);
 
         /** */
         @LoggerResource

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2d7d0ce..a3849d7 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/scripts/git-format-patch.sh
----------------------------------------------------------------------
diff --git a/scripts/git-format-patch.sh b/scripts/git-format-patch.sh
index b11c73d..83aee3e 100755
--- a/scripts/git-format-patch.sh
+++ b/scripts/git-format-patch.sh
@@ -20,7 +20,7 @@
 # Git patch-file maker.
 #
 echo 'Usage: scripts/git-format-patch.sh [-ih|--ignitehome <path>] 
[-idb|--ignitedefbranch <branch-name>] [-ph|--patchhome <path>]'
-echo 'It is a script to create patch between Current branch (branch with 
changes) and Default branche. The script is safe and do not broke or lose your 
changes.'
+echo 'It is a script to create patch between Current branch (branch with 
changes) and Default branch. The script is safe and does not break or lose your 
changes.'
 echo "It should be called from IGNITE_HOME directory."
 echo "Patch will be created at PATCHES_HOME (= IGNITE_HOME, by default) 
between Default branch (IGNITE_DEFAULT_BRANCH) and Current branch."
 echo "Note: you can use ${IGNITE_HOME}/scripts/git-patch-prop-local.sh to set 
your own local properties (to rewrite settings at git-patch-prop-local.sh). "

Reply via email to