Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 2b62e4348 -> d22e6f4b2


IGNITE-45 - Added warning message when server nodes left the grid.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f6e02dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f6e02dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f6e02dcb

Branch: refs/heads/ignite-45
Commit: f6e02dcb0e688c76e56dd77aaa534e0a9c56d0d7
Parents: afb1de6
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Sun Mar 22 14:34:09 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Sun Mar 22 14:34:09 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     | 11 ++-
 .../processors/cache/GridCacheGateway.java      |  8 +-
 .../processors/cache/GridCacheProcessor.java    | 15 ++++
 .../GridDhtPartitionsExchangeFuture.java        | 80 +++++++++++++++++++-
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 80 ++++++++++++++++----
 5 files changed, 177 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 3573ba4..2448d0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -640,6 +640,14 @@ public interface EventType {
     public static final int EVT_CACHE_STOPPED = 99;
 
     /**
+     * Built-in event type: cache nodes left.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     */
+    public static final int EVT_CACHE_NODES_LEFT = 100;
+
+    /**
      * Built-in event type: Visor detects that some events were evicted from 
events buffer since last poll.
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
@@ -953,7 +961,8 @@ public interface EventType {
      */
     public static final int[] EVTS_CACHE_LIFECYCLE = {
         EVT_CACHE_STARTED,
-        EVT_CACHE_STOPPED
+        EVT_CACHE_STOPPED,
+        EVT_CACHE_NODES_LEFT
     };
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 35a5d90..4868b3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -176,10 +176,14 @@ public class GridCacheGateway<K, V> {
     /**
      *
      */
-    public void onStopped() {
-        // Must prevent re-entries to the read lock.
+    public void block() {
         stopped = true;
+    }
 
+    /**
+     *
+     */
+    public void onStopped() {
         boolean interrupted = false;
 
         while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e9b7ae2..6633356 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1360,6 +1360,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         assert req.stop();
 
         // Break the proxy before exchange future is done.
+        IgniteCacheProxy<?, ?> proxy = 
jCacheProxies.get(maskNull(req.cacheName()));
+
+        if (proxy != null)
+            proxy.gate().block();
+    }
+
+    /**
+     * @param req Request.
+     */
+    private void stopGateway(DynamicCacheChangeRequest req) {
+        assert req.stop();
+
+        // Break the proxy before exchange future is done.
         IgniteCacheProxy<?, ?> proxy = 
jCacheProxies.remove(maskNull(req.cacheName()));
 
         if (proxy != null)
@@ -1416,6 +1429,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 String masked = maskNull(req.cacheName());
 
                 if (req.stop()) {
+                    stopGateway(req);
+
                     prepareCacheStop(req);
 
                     DynamicCacheDescriptor desc = registeredCaches.get(masked);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8d21bc3..3dfcc8c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -292,6 +292,23 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @return {@code True} if local client has been added.
+     */
+    public boolean isLocalClientAdded(int cacheId) {
+        if (!F.isEmpty(reqs)) {
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.start() && F.eq(req.initiatingNodeId(), 
cctx.localNodeId())) {
+                    if (CU.cacheId(req.cacheName()) == cacheId)
+                        return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * Rechecks topology.
      */
     private void initTopology(GridCacheContext cacheCtx) throws 
IgniteCheckedException {
@@ -434,8 +451,69 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                 startCaches();
 
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (isCacheAdded(cacheCtx.cacheId())) {
+                        if 
(cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), 
topologyVersion()).isEmpty())
+                            U.quietAndWarn(log, "No server nodes found for 
cache client: " + cacheCtx.namex());
+                    }
+
                     cacheCtx.preloader().onExchangeFutureAdded();
+                }
+
+                List<String> cachesWithoutNodes = null;
+
+                for (String name : cctx.cache().cacheNames()) {
+                    if (exchId.isLeft()) {
+                        if (cctx.discovery().cacheAffinityNodes(name, 
topologyVersion()).isEmpty()) {
+                            if (cachesWithoutNodes == null)
+                                cachesWithoutNodes = new ArrayList<>();
+
+                            cachesWithoutNodes.add(name);
+
+                            // Fire event even if there is no client cache 
started.
+                            if 
(cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+                                Event evt = new CacheEvent(
+                                    name,
+                                    cctx.localNode(),
+                                    cctx.localNode(),
+                                    "All server nodes have left the cluster.",
+                                    EventType.EVT_CACHE_NODES_LEFT,
+                                    0,
+                                    false,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    false,
+                                    null,
+                                    false,
+                                    null,
+                                    null,
+                                    null
+                                );
+
+                                cctx.gridEvents().record(evt);
+                            }
+                        }
+                    }
+                }
+
+                if (cachesWithoutNodes != null) {
+                    StringBuilder sb = new StringBuilder("All server nodes for 
the following caches have left the cluster: ");
+
+                    for (int i = 0; i < cachesWithoutNodes.size(); i++) {
+                        String cache = cachesWithoutNodes.get(i);
+
+                        sb.append('\'').append(cache).append('\'');
+
+                        if (i != cachesWithoutNodes.size() - 1)
+                            sb.append(", ");
+                    }
+
+                    U.quietAndWarn(log, sb.toString());
+
+                    U.quietAndWarn(log, "Must have server nodes for caches to 
operate.");
+                }
 
                 assert discoEvt != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index da4a2c2..d9757ba 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -86,7 +86,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(cacheCfg);
 
-        cfg.setIncludeEventTypes(EventType.EVT_CACHE_STARTED, 
EventType.EVT_CACHE_STOPPED);
+        cfg.setIncludeEventTypes(EventType.EVT_CACHE_STARTED, 
EventType.EVT_CACHE_STOPPED, EventType.EVT_CACHE_NODES_LEFT);
 
         return cfg;
     }
@@ -111,7 +111,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStopCacheMultithreadedSameNode() throws Exception {
-        final IgniteKernal kernal = (IgniteKernal) grid(0);
+        final IgniteEx kernal = grid(0);
 
         final Collection<IgniteInternalFuture<?>> futs = new 
ConcurrentLinkedDeque<>();
 
@@ -184,7 +184,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
                 ccfg.setName(DYNAMIC_CACHE_NAME);
 
-                IgniteKernal kernal = (IgniteKernal) 
grid(ThreadLocalRandom.current().nextInt(nodeCount()));
+                IgniteEx kernal = 
grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
                 futs.add(kernal.context().cache().dynamicStartCache(ccfg, 
ccfg.getName(), null, true));
 
@@ -219,7 +219,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override
             public Object call() throws Exception {
-                IgniteKernal kernal = (IgniteKernal) 
grid(ThreadLocalRandom.current().nextInt(nodeCount()));
+                IgniteEx kernal = 
grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
                 
futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
 
@@ -251,7 +251,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkStartStopCacheSimple(CacheAtomicityMode mode) throws 
Exception {
-        final IgniteKernal kernal = (IgniteKernal) grid(0);
+        final IgniteEx kernal = grid(0);
 
         CacheConfiguration ccfg = new CacheConfiguration();
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -262,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
         kernal.createCache(ccfg);
 
         for (int g = 0; g < nodeCount(); g++) {
-            IgniteKernal kernal0 = (IgniteKernal) grid(g);
+            IgniteEx kernal0 = grid(g);
 
             for (IgniteInternalFuture f : 
kernal0.context().cache().context().exchange().exchangeFutures())
                 f.get();
@@ -313,7 +313,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStopCacheAddNode() throws Exception {
-        final IgniteKernal kernal = (IgniteKernal) grid(0);
+        final IgniteEx kernal = grid(0);
 
         CacheConfiguration ccfg = new CacheConfiguration();
         ccfg.setCacheMode(CacheMode.REPLICATED);
@@ -375,7 +375,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             startGrid(nodeCount());
 
-            final IgniteKernal kernal = (IgniteKernal) grid(0);
+            final IgniteEx kernal = grid(0);
 
             CacheConfiguration ccfg = new CacheConfiguration();
             
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -440,7 +440,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
         GridTestUtils.assertThrowsInherited(log, new Callable<Object>() {
             @Override
             public Object call() throws Exception {
-                final IgniteKernal kernal = (IgniteKernal) grid(0);
+                final Ignite kernal = grid(0);
 
                 CacheConfiguration ccfg = new CacheConfiguration();
                 
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -464,7 +464,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             startGrid(nodeCount());
 
-            final IgniteKernal kernal = (IgniteKernal) grid(0);
+            final IgniteEx kernal = grid(0);
 
             CacheConfiguration ccfg = new CacheConfiguration();
             
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -507,7 +507,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             startGrid(nodeCount());
 
-            final IgniteKernal kernal = (IgniteKernal) grid(0);
+            final IgniteEx kernal = grid(0);
 
             CacheConfiguration ccfg = new CacheConfiguration();
             
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -549,7 +549,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             startGrid(nodeCount());
 
-            final IgniteKernal kernal = (IgniteKernal)grid(0);
+            final IgniteEx kernal = grid(0);
 
             CacheConfiguration ccfg = new CacheConfiguration();
             
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
@@ -669,7 +669,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
                 assertTrue(cacheAdapter.context().isNear());
 
                 try {
-                    IgniteKernal grid = (IgniteKernal)startGrid(nodeCount() + 
1);
+                    IgniteEx grid = (IgniteEx)startGrid(nodeCount() + 1);
 
                     // Check that new node sees near node.
                     GridDiscoveryManager disco = grid.context().discovery();
@@ -915,4 +915,58 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
                 assertNull(jcache.get(k));
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodesLeftEvent() throws Exception {
+        testAttribute = false;
+
+        startGrid(nodeCount());
+
+        CacheConfiguration cfg = new CacheConfiguration(DYNAMIC_CACHE_NAME);
+
+        cfg.setNodeFilter(F.not(NODE_FILTER));
+
+        try (IgniteCache<Object, Object> ignored = ignite(0).createCache(cfg)) 
{
+
+            final CountDownLatch[] latches = new CountDownLatch[nodeCount()];
+
+            IgnitePredicate[] lsnrs = new IgnitePredicate[nodeCount()];
+
+            for (int i = 0; i < nodeCount(); i++) {
+                final int idx = i;
+
+                latches[i] = new CountDownLatch(1);
+                lsnrs[i] = new IgnitePredicate<CacheEvent>() {
+                    @Override
+                    public boolean apply(CacheEvent e) {
+                        switch (e.type()) {
+                            case EventType.EVT_CACHE_NODES_LEFT:
+                                latches[idx].countDown();
+
+                                break;
+
+                            default:
+                                assert false;
+                        }
+
+                        assertEquals(DYNAMIC_CACHE_NAME, e.cacheName());
+
+                        return true;
+                    }
+                };
+
+                ignite(i).events().localListen(lsnrs[i], 
EventType.EVTS_CACHE_LIFECYCLE);
+            }
+
+            stopGrid(nodeCount());
+
+            for (CountDownLatch latch : latches)
+                latch.await();
+
+            for (int i = 0; i < nodeCount(); i++)
+                ignite(i).events().stopLocalListen(lsnrs[i]);
+        }
+    }
 }

Reply via email to