# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fa007b1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fa007b1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fa007b1f Branch: refs/heads/ignite-901 Commit: fa007b1f5e6d9a60827eef0d979bbd2959fb7cc3 Parents: e73e496 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 9 09:59:59 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 9 17:53:41 2015 +0300 ---------------------------------------------------------------------- .../internal/managers/GridManagerAdapter.java | 6 +- .../discovery/GridDiscoveryManager.java | 41 ++-- .../processors/cache/GridCacheProcessor.java | 10 +- .../cache/GridCacheSharedContext.java | 24 +- .../GridDhtPartitionsExchangeFuture.java | 4 - .../java/org/apache/ignite/spi/IgniteSpi.java | 15 ++ .../org/apache/ignite/spi/IgniteSpiAdapter.java | 13 + .../ignite/spi/discovery/tcp/ClientImpl.java | 77 ++++-- .../tcp/internal/TcpDiscoveryNode.java | 15 +- .../GridDeploymentManagerStopSelfTest.java | 11 + .../loadtests/hashmap/GridCacheTestContext.java | 4 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 236 ++++++++++++++++++- .../query/h2/twostep/GridMergeIndex.java | 29 +-- .../h2/twostep/GridReduceQueryExecutor.java | 3 + 14 files changed, 401 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 9faa056..298ff24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -168,12 +168,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { - // No-op. + for (T t : spis) + t.onClientDisconnected(reconnectFut); } /** {@inheritDoc} */ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - // No-op. + for (T t : spis) + t.onClientReconnected(clusterRestarted); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index c0d9f13..986a995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -491,10 +491,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert node.isClient() : node; ((IgniteKernal)ctx.grid()).onDisconnected(); - - recordEvent(type, topVer, node, topSnapshot); - - return; } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -506,16 +502,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); - recordEvent(type, topVer, node, topSnapshot); - ctx.gateway().onReconnected(); - - if (log.isInfoEnabled()) - log.info("Client node reconnected to cluster: " + node); - - ackTopology(topVer, true); - - return; } discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); @@ -989,7 +976,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collection<ClusterNode> rmtNodes = discoCache.remoteNodes(); - Collection<ClusterNode> serverNodes = F.view(discoCache.allNodes(), F.not(clientFilter)); + Collection<ClusterNode> srvNodes = F.view(discoCache.allNodes(), F.not(clientFilter)); Collection<ClusterNode> clientNodes = F.view(discoCache.allNodes(), clientFilter); @@ -1009,7 +996,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { double heap = U.heapSize(allNodes, 2); if (log.isQuiet()) - U.quiet(false, topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap)); + U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap)); if (log.isDebugEnabled()) { String dbg = ""; @@ -1019,7 +1006,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ">>> " + PREFIX + "." + U.nl() + ">>> +----------------+" + U.nl() + ">>> Grid name: " + (ctx.gridName() == null ? "default" : ctx.gridName()) + U.nl() + - ">>> Number of server nodes: " + serverNodes.size() + U.nl() + + ">>> Number of server nodes: " + srvNodes.size() + U.nl() + ">>> Number of client nodes: " + clientNodes.size() + U.nl() + (discoOrdered ? ">>> Topology version: " + topVer + U.nl() : "") + ">>> Topology hash: 0x" + Long.toHexString(hash).toUpperCase() + U.nl(); @@ -1053,7 +1040,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { log.debug(dbg); } else if (log.isInfoEnabled()) - log.info(topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap)); + log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap)); } /** @@ -1063,10 +1050,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param heap Heap size. * @return Topology snapshot message. */ - private String topologySnapshotMessage(int serverNodesNum, int clientNodesNum, int totalCpus, double heap) { + private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) { return PREFIX + " [" + (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") + - "server nodes=" + serverNodesNum + + "server nodes=" + srvNodesNum + ", client nodes=" + clientNodesNum + ", CPUs=" + totalCpus + ", heap=" + heap + "GB" + @@ -1917,6 +1904,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } + case EVT_CLIENT_NODE_DISCONNECTED: { + if (log.isInfoEnabled()) + log.info("Client node disconnected from topology: " + node); + + break; + } + + case EVT_CLIENT_NODE_RECONNECTED: { + if (log.isInfoEnabled()) + log.info("Client node reconnected to topology: " + node); + + ackTopology(topVer.topologyVersion(), true); + + break; + } + case EVT_NODE_FAILED: { // Check only if resolvers were configured. if (hasRslvrs) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ebbe639..61f7e58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -575,8 +575,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners( - ctx, ctx.config().getCacheStoreSessionListenerFactories())); + sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, + ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); @@ -1721,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { depMgr, exchMgr, ioMgr, - storeSesLsnrs, - jta + jta, + storeSesLsnrs ); } @@ -1733,8 +1733,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { + // Collect dynamically started caches to a single object. Collection<DynamicCacheChangeRequest> reqs = - // Collect dynamically started caches to a single object. new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 715d514..d0064f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -87,9 +87,15 @@ public class GridCacheSharedContext<K, V> { private Collection<CacheStoreSessionListener> storeSesLsnrs; /** + * @param kernalCtx Context. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. + * @param depMgr Deployment manager. + * @param exchMgr Exchange manager. + * @param ioMgr IO manager. + * @param jtaMgr JTA manager. + * @param storeSesLsnrs Store session listeners. */ public GridCacheSharedContext( GridKernalContext kernalCtx, @@ -99,12 +105,12 @@ public class GridCacheSharedContext<K, V> { GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, GridCacheIoManager ioMgr, - Collection<CacheStoreSessionListener> storeSesLsnrs, - CacheJtaManagerAdapter jtaMgr + CacheJtaManagerAdapter jtaMgr, + Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; - setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr); + setManagers(txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr); this.storeSesLsnrs = storeSesLsnrs; @@ -114,6 +120,7 @@ public class GridCacheSharedContext<K, V> { } /** + * @param reconnectFut Reconnect future. * @throws IgniteCheckedException If failed. */ void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { @@ -137,6 +144,7 @@ public class GridCacheSharedContext<K, V> { mgrs = new LinkedList<>(); setManagers(txMgr, + jtaMgr, verMgr, mvccMgr, new GridCacheDeploymentManager<K, V>(), @@ -149,7 +157,17 @@ public class GridCacheSharedContext<K, V> { } } + /** + * @param txMgr Transaction manager. + * @param verMgr Version manager. + * @param mvccMgr MVCC manager. + * @param depMgr Deployment manager. + * @param exchMgr Exchange manager. + * @param ioMgr IO manager. + * @param jtaMgr JTA manager. + */ private void setManagers(IgniteTxManager txMgr, + CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, GridCacheDeploymentManager<K, V> depMgr, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0369eb9..38a0d55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -458,8 +458,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone()) return; - log.info("Init exchange: " + exchangeId()); - if (init.compareAndSet(false, true)) { if (isDone()) return; @@ -1026,8 +1024,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.exchange().onExchangeDone(this, err); if (super.onDone(res, err) && !dummy && !forcePreload) { - log.info("Finished exchange: " + exchangeId() + ", err=" + err); - if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java index 968d88d..0f6ed5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.util.*; @@ -106,4 +107,18 @@ public interface IgniteSpi { * @throws IgniteSpiException Thrown in case of any error during SPI stop. */ public void spiStop() throws IgniteSpiException; + + /** + * Client node disconnected callback. + * + * @param reconnectFut Future that will be completed when client reconnected. + */ + public void onClientDisconnected(IgniteFuture<?> reconnectFut); + + /** + * Client node reconnected callback. + * + * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. + */ + public void onClientReconnected(boolean clusterRestarted); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index dd19203..a49d85a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.resources.*; @@ -191,8 +192,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement spiCtx = new GridDummySpiContext(locNode, true, spiCtx); } + /** {@inheritDoc} */ + @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onClientReconnected(boolean clusterRestarted) { + // No-op. + } + /** * Inject ignite instance. + * + * @param ignite Ignite instance. */ @IgniteInstanceResource protected void injectResources(Ignite ignite) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index b3793b1..8041a63 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -512,9 +512,18 @@ class ClientImpl extends TcpDiscoveryImpl { tstamp = U.currentTimeMillis(); - TcpDiscoveryAbstractMessage msg = recon ? - new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) : - new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); + TcpDiscoveryAbstractMessage msg; + + if (!recon) { + TcpDiscoveryNode node = locNode; + + if (locNode.order() > 0) + node = locNode.clientReconnectNode(); + + msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId())); + } + else + msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId); msg.client(true); @@ -977,6 +986,8 @@ class ClientImpl extends TcpDiscoveryImpl { long timeout = join ? spi.joinTimeout : spi.netTimeout; + log.info("Will try to reconnect with timeout: " + timeout); + long startTime = U.currentTimeMillis(); try { @@ -1020,6 +1031,8 @@ class ClientImpl extends TcpDiscoveryImpl { TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; if (res.creatorNodeId().equals(getLocalNodeId())) { + log.info("Reconnect status: " + res.success()); + if (res.success()) { msgWorker.addMessage(res); @@ -1032,7 +1045,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - else // TODO IGNITE-901 reuse socket. + else return; } } @@ -1136,6 +1149,8 @@ class ClientImpl extends TcpDiscoveryImpl { break; } else if (state == ClientImpl.State.DISCONNECTED) { + log.info("Rejoin timeout, will segment."); + state = ClientImpl.State.SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); @@ -1172,6 +1187,8 @@ class ClientImpl extends TcpDiscoveryImpl { } } else { + log.info("Socket closed, will try to reconnect."); + assert reconnector == null; final Reconnector reconnector = new Reconnector(join); @@ -1187,30 +1204,40 @@ class ClientImpl extends TcpDiscoveryImpl { reconnector = null; if (spi.isClientReconnectDisabled()) { - state = ClientImpl.State.SEGMENTED; + if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) { + log.info("Reconnected failed, will segment."); - notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + state = ClientImpl.State.SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } } else { - state = ClientImpl.State.DISCONNECTED; + log.info("Reconnected failed, will try join."); - nodeAdded = false; + if (state != ClientImpl.State.DISCONNECTED) { + state = ClientImpl.State.DISCONNECTED; - IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( - null, "Failed to ping node, client node disconnected."); + nodeAdded = false; - for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { - GridFutureAdapter<Boolean> fut = e.getValue(); + IgniteClientDisconnectedCheckedException err = + new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected."); - if (pingFuts.remove(e.getKey(), fut)) - fut.onDone(err); - } + for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { + GridFutureAdapter<Boolean> fut = e.getValue(); - notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + if (pingFuts.remove(e.getKey(), fut)) + fut.onDone(err); + } + + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + } UUID newId = UUID.randomUUID(); - log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME)); + if (log.isInfoEnabled()) + log.info("Client will try to reconnect to cluster with new id " + + "[id=" + newId + ", prevId=" + locNode.id() + ']'); locNode.onClientDisconnected(newId); @@ -1220,7 +1247,7 @@ class ClientImpl extends TcpDiscoveryImpl { else { TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (joinLatch.getCount() > 0) { // TODO IGNITE-901. + if (joining()) { IgniteSpiException err = null; if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) @@ -1231,7 +1258,13 @@ class ClientImpl extends TcpDiscoveryImpl { err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); if (err != null) { - joinError(err); + if (state == ClientImpl.State.DISCONNECTED) { + state = ClientImpl.State.SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + else + joinError(err); break; } @@ -1263,12 +1296,16 @@ class ClientImpl extends TcpDiscoveryImpl { boolean join = state == ClientImpl.State.STARTING; + log.info("Try join topology with timeout: " + spi.joinTimeout); + final Socket sock = joinTopology(false, spi.joinTimeout); if (sock == null) { if (join) joinError(new IgniteSpiException("Join process timed out.")); else { + log.info("Send join request on rejoin failed, will segment."); + state = ClientImpl.State.SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); @@ -1284,7 +1321,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (spi.joinTimeout > 0) { timer.schedule(new TimerTask() { @Override public void run() { - if (joinLatch.getCount() > 0) + if (joining()) queue.add(JOIN_TIMEOUT); } }, spi.joinTimeout); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 1e849f4..b8f1a86 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -449,9 +449,18 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste */ public void onClientDisconnected(UUID newId) { id = newId; - order = 0; - intOrder = 0; - visible = false; + } + + /** + * @return Copy of local node for client reconnect request. + */ + public TcpDiscoveryNode clientReconnectNode() { + TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver); + + node.attrs = attrs; + node.clientRouterNodeId = clientRouterNodeId; + + return node; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java index 9780080..b8f9ce1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.resource.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.deployment.*; @@ -95,5 +96,15 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public boolean unregister(String rsrcName) { return false; } + + /** {@inheritDoc} */ + @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onClientReconnected(boolean clusterRestarted) { + // No-op. + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index e9d7a45..9a883b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), new GridCacheIoManager(), - null, - new CacheNoopJtaManager() + new CacheNoopJtaManager(), + null ), defaultCacheConfiguration(), CacheType.USER, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index ba38dfc..5838481 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite; + /** */ + private boolean reconnectDisabled; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setJoinTimeout(joinTimeout); disco.setNetworkTimeout(netTimeout); + disco.setClientReconnectDisabled(reconnectDisabled); + disco.afterWrite(afterWrite); cfg.setDiscoverySpi(disco); @@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void testClientSegmentation() throws Exception { clientsPerSrv = 1; + reconnectDisabled = true; + startServerNodes(3); startClientNodes(3); @@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); try { + log.info("Fail server: " + 2); + failServer(2); await(srvFailedLatch); @@ -888,8 +897,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { try { startClientNodes(1); - assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0") - .cluster().localNode()).clientRouterNodeId()); + assertEquals(G.ignite("server-0").cluster().localNode().id(), + ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); checkNodes(2, 1); @@ -1278,8 +1287,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final CountDownLatch disconnectLatch = new CountDownLatch(1); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { info("Client event: " + evt); if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { @@ -1334,22 +1342,230 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testClientReconnectDisabled() throws Exception { - // TODO IGNTIE-901. + public void testClientFailReconnectDisabled() throws Exception { + reconnectDisabled = true; + + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final CountDownLatch segmentedLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_SEGMENTED) + segmentedLatch.countDown(); + + return false; + } + }, EVT_NODE_SEGMENTED); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + log.info("Fail client node."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(srvFailedLatch.await(5000, MILLISECONDS)); + assertTrue(segmentedLatch.await(5000, MILLISECONDS)); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception { + reconnectSegmentedAfterJoinTimeout(true); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception { + reconnectSegmentedAfterJoinTimeout(false); + } + + /** + * @param failSrv If {@code true} fails server, otherwise server does not send join message. + * @throws Exception If failed. + */ + private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception { + netTimeout = 4000; + joinTimeout = 5000; + + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch segmentedLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + if (!failSrv) { + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + } + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, segmentedLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + assertFalse(err.get()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_NODE_SEGMENTED) { + log.info("Segmented event."); + + assertEquals(1, segmentedLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + segmentedLatch.countDown(); + } + else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + if (failSrv) { + log.info("Fail server."); + + failServer(0); + } + else { + log.info("Fail client connection."); + + srvSpi.failClientReconnect.set(1_000_000); + srvSpi.failNodeAdded.set(1_000_000); + + clientSpi.brakeConnection(); + } + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + assertTrue(segmentedLatch.await(10_000, MILLISECONDS)); + + waitSegmented(client); + + assertFalse(err.get()); + + if (!failSrv) + await(srvFailedLatch); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClusterRestart() throws Exception { + netTimeout = 3000; + joinTimeout = 60_000; + + clientIpFinder = new TcpDiscoveryVmIpFinder(); + + clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509")); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + reconnectLatch.countDown(); + } else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + log.info("Stop server."); + + srv.close(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + srvNodeIds.clear(); + srvIdx.set(0); + + Thread.sleep(3000); + + log.info("Restart server."); + + startServerNodes(1); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + assertFalse(err.get()); } /** * @throws Exception If failed. */ public void testDisconnectAfterNetworkTimeout() throws Exception { - // TODO IGNTIE-901. } /** + * @param ignite Ignite. * @throws Exception If failed. */ - public void testReconnectSegmentedAfterJoinTimeout() throws Exception { - // TODO IGNTIE-901. + private void waitSegmented(final Ignite ignite) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name()); + } + }, 5000); + + assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name())); } /** @@ -1567,7 +1783,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws InterruptedException If interrupted. */ private void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); + assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 05677a4..2b2996d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -26,7 +26,7 @@ import org.h2.table.*; import org.jetbrains.annotations.*; import org.jsr166.*; -import javax.cache.CacheException; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -93,6 +93,9 @@ public abstract class GridMergeIndex extends BaseIndex { throw new IllegalStateException(); } + /** + * @param e Error. + */ public void fail(final CacheException e) { for (UUID nodeId0 : remainingRows.keySet()) { addPage0(new GridResultPage(null, nodeId0, null) { @@ -100,8 +103,7 @@ public abstract class GridMergeIndex extends BaseIndex { return true; } - @Override - public void fetchNextPage() { + @Override public void fetchNextPage() { throw e; } }); @@ -111,23 +113,12 @@ public abstract class GridMergeIndex extends BaseIndex { /** * @param nodeId Node ID. */ - public void fail(@Nullable UUID nodeId) { - if (nodeId != null) { - addPage0(new GridResultPage(null, nodeId, null) { - @Override public boolean isFail() { - return true; - } - }); - } - else { - for (UUID nodeId0 : remainingRows.keySet()) { - addPage0(new GridResultPage(null, nodeId0, null) { - @Override public boolean isFail() { - return true; - } - }); + public void fail(UUID nodeId) { + addPage0(new GridResultPage(null, nodeId, null) { + @Override public boolean isFail() { + return true; } - } + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 8f03681..cde3288 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1162,6 +1162,9 @@ public class GridReduceQueryExecutor { tbl.getScanIndex(null).fail(nodeId); } + /** + * @param e Error. + */ void disconnected(CacheException e) { if (!state.compareAndSet(null, e)) return;