Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 2b62e4348 -> d22e6f4b2
IGNITE-45 - Added warning message when server nodes left the grid. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f6e02dcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f6e02dcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f6e02dcb Branch: refs/heads/ignite-45 Commit: f6e02dcb0e688c76e56dd77aaa534e0a9c56d0d7 Parents: afb1de6 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sun Mar 22 14:34:09 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sun Mar 22 14:34:09 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ignite/events/EventType.java | 11 ++- .../processors/cache/GridCacheGateway.java | 8 +- .../processors/cache/GridCacheProcessor.java | 15 ++++ .../GridDhtPartitionsExchangeFuture.java | 80 +++++++++++++++++++- .../cache/IgniteDynamicCacheStartSelfTest.java | 80 ++++++++++++++++---- 5 files changed, 177 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 3573ba4..2448d0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -640,6 +640,14 @@ public interface EventType { public static final int EVT_CACHE_STOPPED = 99; /** + * Built-in event type: cache nodes left. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + */ + public static final int EVT_CACHE_NODES_LEFT = 100; + + /** * Built-in event type: Visor detects that some events were evicted from events buffer since last poll. * <p> * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for @@ -953,7 +961,8 @@ public interface EventType { */ public static final int[] EVTS_CACHE_LIFECYCLE = { EVT_CACHE_STARTED, - EVT_CACHE_STOPPED + EVT_CACHE_STOPPED, + EVT_CACHE_NODES_LEFT }; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 35a5d90..4868b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -176,10 +176,14 @@ public class GridCacheGateway<K, V> { /** * */ - public void onStopped() { - // Must prevent re-entries to the read lock. + public void block() { stopped = true; + } + /** + * + */ + public void onStopped() { boolean interrupted = false; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e9b7ae2..6633356 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1360,6 +1360,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert req.stop(); // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); + + if (proxy != null) + proxy.gate().block(); + } + + /** + * @param req Request. + */ + private void stopGateway(DynamicCacheChangeRequest req) { + assert req.stop(); + + // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); if (proxy != null) @@ -1416,6 +1429,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { String masked = maskNull(req.cacheName()); if (req.stop()) { + stopGateway(req); + prepareCacheStop(req); DynamicCacheDescriptor desc = registeredCaches.get(masked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8d21bc3..3dfcc8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -292,6 +292,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param cacheId Cache ID. + * @return {@code True} if local client has been added. + */ + public boolean isLocalClientAdded(int cacheId) { + if (!F.isEmpty(reqs)) { + for (DynamicCacheChangeRequest req : reqs) { + if (req.start() && F.eq(req.initiatingNodeId(), cctx.localNodeId())) { + if (CU.cacheId(req.cacheName()) == cacheId) + return true; + } + } + } + + return false; + } + + /** * Rechecks topology. */ private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { @@ -434,8 +451,69 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT startCaches(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (isCacheAdded(cacheCtx.cacheId())) { + if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty()) + U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex()); + } + cacheCtx.preloader().onExchangeFutureAdded(); + } + + List<String> cachesWithoutNodes = null; + + for (String name : cctx.cache().cacheNames()) { + if (exchId.isLeft()) { + if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { + if (cachesWithoutNodes == null) + cachesWithoutNodes = new ArrayList<>(); + + cachesWithoutNodes.add(name); + + // Fire event even if there is no client cache started. + if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) { + Event evt = new CacheEvent( + name, + cctx.localNode(), + cctx.localNode(), + "All server nodes have left the cluster.", + EventType.EVT_CACHE_NODES_LEFT, + 0, + false, + null, + null, + null, + null, + false, + null, + false, + null, + null, + null + ); + + cctx.gridEvents().record(evt); + } + } + } + } + + if (cachesWithoutNodes != null) { + StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: "); + + for (int i = 0; i < cachesWithoutNodes.size(); i++) { + String cache = cachesWithoutNodes.get(i); + + sb.append('\'').append(cache).append('\''); + + if (i != cachesWithoutNodes.size() - 1) + sb.append(", "); + } + + U.quietAndWarn(log, sb.toString()); + + U.quietAndWarn(log, "Must have server nodes for caches to operate."); + } assert discoEvt != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6e02dcb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index da4a2c2..d9757ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -86,7 +86,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(cacheCfg); - cfg.setIncludeEventTypes(EventType.EVT_CACHE_STARTED, EventType.EVT_CACHE_STOPPED); + cfg.setIncludeEventTypes(EventType.EVT_CACHE_STARTED, EventType.EVT_CACHE_STOPPED, EventType.EVT_CACHE_NODES_LEFT); return cfg; } @@ -111,7 +111,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopCacheMultithreadedSameNode() throws Exception { - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>(); @@ -184,7 +184,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - IgniteKernal kernal = (IgniteKernal) grid(ThreadLocalRandom.current().nextInt(nodeCount())); + IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true)); @@ -219,7 +219,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { - IgniteKernal kernal = (IgniteKernal) grid(ThreadLocalRandom.current().nextInt(nodeCount())); + IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME)); @@ -251,7 +251,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void checkStartStopCacheSimple(CacheAtomicityMode mode) throws Exception { - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -262,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { kernal.createCache(ccfg); for (int g = 0; g < nodeCount(); g++) { - IgniteKernal kernal0 = (IgniteKernal) grid(g); + IgniteEx kernal0 = grid(g); for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) f.get(); @@ -313,7 +313,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopCacheAddNode() throws Exception { - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(CacheMode.REPLICATED); @@ -375,7 +375,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { startGrid(nodeCount()); - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -440,7 +440,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { @Override public Object call() throws Exception { - final IgniteKernal kernal = (IgniteKernal) grid(0); + final Ignite kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -464,7 +464,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { startGrid(nodeCount()); - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -507,7 +507,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { startGrid(nodeCount()); - final IgniteKernal kernal = (IgniteKernal) grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -549,7 +549,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { startGrid(nodeCount()); - final IgniteKernal kernal = (IgniteKernal)grid(0); + final IgniteEx kernal = grid(0); CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); @@ -669,7 +669,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertTrue(cacheAdapter.context().isNear()); try { - IgniteKernal grid = (IgniteKernal)startGrid(nodeCount() + 1); + IgniteEx grid = (IgniteEx)startGrid(nodeCount() + 1); // Check that new node sees near node. GridDiscoveryManager disco = grid.context().discovery(); @@ -915,4 +915,58 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertNull(jcache.get(k)); } } + + /** + * @throws Exception If failed. + */ + public void testServerNodesLeftEvent() throws Exception { + testAttribute = false; + + startGrid(nodeCount()); + + CacheConfiguration cfg = new CacheConfiguration(DYNAMIC_CACHE_NAME); + + cfg.setNodeFilter(F.not(NODE_FILTER)); + + try (IgniteCache<Object, Object> ignored = ignite(0).createCache(cfg)) { + + final CountDownLatch[] latches = new CountDownLatch[nodeCount()]; + + IgnitePredicate[] lsnrs = new IgnitePredicate[nodeCount()]; + + for (int i = 0; i < nodeCount(); i++) { + final int idx = i; + + latches[i] = new CountDownLatch(1); + lsnrs[i] = new IgnitePredicate<CacheEvent>() { + @Override + public boolean apply(CacheEvent e) { + switch (e.type()) { + case EventType.EVT_CACHE_NODES_LEFT: + latches[idx].countDown(); + + break; + + default: + assert false; + } + + assertEquals(DYNAMIC_CACHE_NAME, e.cacheName()); + + return true; + } + }; + + ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE); + } + + stopGrid(nodeCount()); + + for (CountDownLatch latch : latches) + latch.await(); + + for (int i = 0; i < nodeCount(); i++) + ignite(i).events().stopLocalListen(lsnrs[i]); + } + } }