# ignite-901 WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59b967aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59b967aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59b967aa Branch: refs/heads/ignite-901 Commit: 59b967aa973fc365cb7514e112194c7689982adc Parents: 363e161 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jul 3 17:42:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jul 3 17:42:18 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 3 +- .../ignite/internal/GridPluginComponent.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 6 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../deployment/GridDeploymentManager.java | 2 +- .../discovery/GridDiscoveryManager.java | 5 +- .../processors/GridProcessorAdapter.java | 2 +- .../cache/DynamicCacheChangeBatch.java | 17 ++ .../processors/cache/GridCacheProcessor.java | 186 +++++++++++++------ .../datastructures/DataStructuresProcessor.java | 7 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 11 +- .../IgniteClientReconnectAbstractTest.java | 8 +- .../IgniteClientReconnectCacheTest.java | 144 +++++++++++++- 13 files changed, 317 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 705576e..fb0a157 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -126,7 +126,8 @@ public interface GridComponent { public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException; /** + * @param clusterRestarted Cluster restarted flag. * @throws IgniteCheckedException If failed. */ - public void onReconnected() throws IgniteCheckedException; + public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index 9639df0..55a84c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -70,7 +70,7 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ - @Override public void onReconnected() { + @Override public void onReconnected(boolean clusterRestarted) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f97a1c4..5876288 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2848,14 +2848,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * + * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. */ - public void reconnected() { + public void reconnected(boolean clusterRestarted) { Throwable err = null; try { for (GridComponent comp : ctx.components()) - comp.onReconnected(); + comp.onReconnected(clusterRestarted); ctx.gateway().onReconnected(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index b0a46eb..1cbe68d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -172,7 +172,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } /** {@inheritDoc} */ - @Override public void onReconnected() throws IgniteCheckedException { + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 9eda2eb..9e418a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -119,7 +119,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { } /** {@inheritDoc} */ - @Override public void onReconnected() throws IgniteCheckedException { + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { storesOnKernalStart(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a8af43b..f95788a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -296,6 +296,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { locJoinEvt = new GridFutureAdapter<>(); + + registeredCaches.clear(); } /** {@inheritDoc} */ @@ -1906,7 +1908,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { case EVT_CLIENT_NODE_RECONNECTED: { assert localNode().isClient() : evt; - ((IgniteKernal)ctx.grid()).reconnected(); + // TODO IGNITE-901. + ((IgniteKernal)ctx.grid()).reconnected(false); break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index 1a6791b..8baf95c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -68,7 +68,7 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ - @Override public void onReconnected() throws IgniteCheckedException { + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index dfc39c1..1e8184d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -43,6 +43,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** Custom message ID. */ private IgniteUuid id = IgniteUuid.randomUuid(); + /** */ + private boolean clientReconnect; + /** * @param reqs Requests. */ @@ -93,6 +96,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { return false; } + /** + * @param clientReconnect {@code True} if this is discovery data sent on client reconnect. + */ + public void clientReconnect(boolean clientReconnect) { + this.clientReconnect = clientReconnect; + } + + /** + * @return {@code True} if this is discovery data sent on client reconnect. + */ + public boolean clientReconnect() { + return clientReconnect; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 8d3f8da..4fc02d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Count down latch for caches. */ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** */ + private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; + /** * @param ctx Kernal context. */ @@ -914,6 +917,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + cachesOnDisconnect = new HashMap<>(registeredCaches); + + registeredCaches.clear(); + + registeredTemplates.clear(); + for (GridCacheAdapter cache : caches.values()) cache.context().gate().onDisconnected(reconnectFut); @@ -922,15 +931,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : caches.values()) cache.disconnected(); - registeredCaches.clear(); - sharedCtx.onDisconnected(); } /** {@inheritDoc} */ - @Override public void onReconnected() throws IgniteCheckedException { - for (GridCacheAdapter cache : caches.values()) - cache.context().gate().reconnected(false); + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + cachesOnDisconnect = null; + + for (GridCacheAdapter cache : caches.values()) { + boolean stopped = !registeredCaches.containsKey(maskNull(cache.name())); + + cache.context().gate().reconnected(stopped); + + if (stopped) { + sharedCtx.removeCacheContext(cache.ctx); + + caches.remove(maskNull(cache.name())); + jCacheProxies.remove(maskNull(cache.name())); + + onKernalStop(cache, true); + stopCache(cache, true); + } + } marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { @@ -1690,11 +1712,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { - // Collect dynamically started caches to a single object. Collection<DynamicCacheChangeRequest> reqs = + // Collect dynamically started caches to a single object. new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; + + Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches; + + for (DynamicCacheDescriptor desc : descs.values()) { if (!desc.cancelled()) { DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); @@ -1722,7 +1748,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs); - req.clientNodes(ctx.discovery().clientNodesMap()); + Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap(); + + if (reconnect) { + clientNodesMap = U.newHashMap(caches.size()); + + for (GridCacheAdapter<?, ?> cache : caches.values()) { + Boolean nearEnabled = cache.isNear(); + + Map<UUID, Boolean> map = U.newHashMap(1); + + map.put(nodeId, nearEnabled); + + clientNodesMap.put(cache.name(), map); + } + } + + req.clientNodes(clientNodesMap); + + req.clientReconnect(reconnect); return req; } @@ -1732,38 +1776,86 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (data instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; - for (DynamicCacheChangeRequest req : batch.requests()) { - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + if (batch.clientReconnect()) { + for (DynamicCacheChangeRequest req : batch.requests()) { + assert !req.template() : req; - assert ccfg != null : req; + String name = req.cacheName(); - DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); + boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); - if (existing == null) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - true, - req.deploymentId()); + if (!sysCache) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - registeredTemplates.put(maskNull(req.cacheName()), desc); - } + if (desc != null && !desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) { + Map<UUID, Boolean> nodes = batch.clientNodes().get(name); - continue; + assert nodes != null : req; + assert nodes.containsKey(joiningNodeId) : nodes; + + ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId)); + } + } + else + ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false); } + } + else { + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.template()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + assert ccfg != null : req; + + DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); + + if (existing == null) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + ccfg, + req.cacheType(), + true, + req.deploymentId()); + + registeredTemplates.put(maskNull(req.cacheName()), desc); + } - DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); + continue; + } + + DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); + + if (req.start() && !req.clientStartOnly()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + if (existing != null) { + if (existing.locallyConfigured()) { + existing.deploymentId(req.deploymentId()); + + existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); + + ctx.discovery().setCacheFilter( + req.cacheName(), + ccfg.getNodeFilter(), + ccfg.getNearConfiguration() != null, + ccfg.getCacheMode() == LOCAL); + } + } + else { + assert req.cacheType() != null : req; - if (req.start() && !req.clientStartOnly()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + ccfg, + req.cacheType(), + false, + req.deploymentId()); - if (existing != null) { - if (existing.locallyConfigured()) { - existing.deploymentId(req.deploymentId()); + // Received statically configured cache. + if (req.initiatingNodeId() == null) + desc.staticallyConfigured(true); - existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); + registeredCaches.put(maskNull(req.cacheName()), desc); ctx.discovery().setCacheFilter( req.cacheName(), @@ -1772,37 +1864,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.getCacheMode() == LOCAL); } } - else { - assert req.cacheType() != null : req; - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - false, - req.deploymentId()); - - // Received statically configured cache. - if (req.initiatingNodeId() == null) - desc.staticallyConfigured(true); - - registeredCaches.put(maskNull(req.cacheName()), desc); - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); - } } - } - if (!F.isEmpty(batch.clientNodes())) { - for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) { - String cacheName = entry.getKey(); + if (!F.isEmpty(batch.clientNodes())) { + for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) { + String cacheName = entry.getKey(); - for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) - ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) + ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 95c9563..4637bd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -186,14 +186,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onReconnected() throws IgniteCheckedException { + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { Set<GridCacheInternal> keys = dsMap.keySet(); Map<GridCacheInternal, GridCacheInternal> vals = dsView.getAll(keys); for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) { - if (!vals.containsKey(e.getKey())) + if (!vals.containsKey(e.getKey())) { + dsMap.remove(e.getKey()); + e.getValue().onRemoved(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 578aae8..f3f19bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1084,6 +1084,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private Reconnector reconnector; + /** */ + private boolean nodeAdded; + /** * */ @@ -1286,12 +1289,16 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingFinished(msg); } - private boolean nodeAdded; - + /** + * @return {@code True} if client in process of join. + */ private boolean joining() { return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED; } + /** + * @return {@code True} if disconnected. + */ private boolean disconnected() { return state == ClientImpl.State.DISCONNECTED; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index a9ce136..0f8aadd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -165,10 +165,10 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra * * @param client Client. * @param srv Server. - * @param disconnectedClosure Closure which will be run when client node disconnected. + * @param disconnectedC Closure which will be run when client node disconnected. * @throws Exception If failed. */ - protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure) + protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC) throws Exception { final TestTcpDiscoverySpi clientSpi = spi(client); final TestTcpDiscoverySpi srvSpi = spi(srv); @@ -201,8 +201,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - if (disconnectedClosure != null) - disconnectedClosure.run(); + if (disconnectedC != null) + disconnectedC.run(); log.info("Allow reconnect."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 6a77a18..258eef9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -100,7 +101,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac public void testReconnect() throws Exception { clientMode = true; - Ignite client = startGrid(SRV_CNT); + IgniteEx client = startGrid(SRV_CNT); final TestTcpDiscoverySpi clientSpi = spi(client); @@ -110,6 +111,16 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>()); + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("nearCache"); + + final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()); + + nearCache.put(1, 1); + + assertEquals(1, nearCache.localPeek(1)); + cache.put(1, 1); final CountDownLatch disconnectLatch = new CountDownLatch(1); @@ -141,7 +152,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac catch (CacheException e) { log.info("Expected exception: " + e); - IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause(); e0.reconnectFuture().get(); } @@ -187,6 +198,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + checkCacheDiscoveryData(srv, client, null, true, true, false); + + checkCacheDiscoveryData(srv, client, "nearCache", true, true, true); + assertEquals(1, cache.get(1)); putFut.get(); @@ -197,6 +212,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertEquals(3, cache.get(3)); + assertNull(nearCache.localPeek(1)); + this.clientMode = false; IgniteEx srv2 = startGrid(SRV_CNT + 1); @@ -206,6 +223,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac cache.put(key, 4); assertEquals(4, cache.get(key)); + + checkCacheDiscoveryData(srv2, client, null, true, true, false); + + checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true); } /** @@ -231,7 +252,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final CountDownLatch reconnectLatch = new CountDownLatch(1); client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { + @Override + public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); @@ -439,6 +461,74 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } /** + * @throws Exception If failed. + */ + public void testReconnectCacheDestroyed() throws Exception { + clientMode = true; + + final IgniteEx client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srv.destroyCache(null); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + + checkCacheDiscoveryData(srv, client, null, false, false, false); + + IgniteCache<Object, Object> clientCache0 = client.getOrCreateCache(new CacheConfiguration<>()); + + checkCacheDiscoveryData(srv, client, null, true, true, false); + + clientCache0.put(1, 1); + + assertEquals(1, clientCache0.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectCacheDestroyedAndCreated() throws Exception { + clientMode = true; + + final Ignite client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + reconnectClientNode(client, srv, new Runnable() { + @Override + public void run() { + srv.destroyCache(null); + + srv.getOrCreateCache(new CacheConfiguration<>()); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override + public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + } + + /** * @param client Client. * @param ccfg Cache configuration. * @param msgToBlock Message to block. @@ -501,6 +591,54 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } /** + * @param srv Server node. + * @param client Client node. + * @param cacheName Cache name. + * @param cacheExists Cache exists flag. + * @param clientCache {@code True} if client node has client cache. + * @param clientNear {@code True} if client node has near-enabled client cache. + */ + private void checkCacheDiscoveryData(Ignite srv, + Ignite client, + String cacheName, + boolean cacheExists, + boolean clientCache, + boolean clientNear) + { + GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); + GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); + + ClusterNode srvNode = ((IgniteKernal)srv).localNode(); + ClusterNode clientNode = ((IgniteKernal)client).localNode(); + + assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName)); + assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName)); + + assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) + assertTrue(srvDisco.cacheNearNode(clientNode, cacheName)); + else + assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + + assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) + assertTrue(clientDisco.cacheNearNode(clientNode, cacheName)); + else + assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); + + if (cacheExists) { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + else { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty()); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty()); + } + } + + /** * */ private static class TestCommunicationSpi extends TcpCommunicationSpi {