Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 [created] f5f3efd16
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 1071ef2..5a898b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -97,17 +97,62 @@ public class GridCacheSharedContext<K, V> { Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; + + setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr); + + this.storeSesLsnrs = storeSesLsnrs; + + txMetrics = new TransactionMetricsAdapter(); + + ctxMap = new ConcurrentHashMap<>(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + void onDisconnected() throws IgniteCheckedException { + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); + it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); + + if (mgr.restartOnDisconnect()) + mgr.onKernalStop(true, true); + } + + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); + + if (mgr.restartOnDisconnect()) + mgr.stop(true); + } + + mgrs = new LinkedList<>(); + + setManagers(txMgr, + verMgr, + mvccMgr, + new GridCacheDeploymentManager<K, V>(), + new GridCachePartitionExchangeManager<K, V>(), + ioMgr); + + for (GridCacheSharedManager<K, V> mgr : mgrs) { + if (mgr.restartOnDisconnect()) + mgr.start(this); + } + } + + private void setManagers(IgniteTxManager txMgr, + GridCacheVersionManager verMgr, + GridCacheMvccManager mvccMgr, + GridCacheDeploymentManager<K, V> depMgr, + GridCachePartitionExchangeManager<K, V> exchMgr, + GridCacheIoManager ioMgr) { this.mvccMgr = add(mvccMgr); this.verMgr = add(verMgr); this.txMgr = add(txMgr); this.depMgr = add(depMgr); this.exchMgr = add(exchMgr); this.ioMgr = add(ioMgr); - this.storeSesLsnrs = storeSesLsnrs; - - txMetrics = new TransactionMetricsAdapter(); - - ctxMap = new ConcurrentHashMap<>(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java index d45052c..5d27657 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java @@ -46,7 +46,7 @@ public interface GridCacheSharedManager <K, V> { /** * @param cancel Cancel flag. */ - public void onKernalStop(boolean cancel); + public void onKernalStop(boolean cancel, boolean disconnected); /** * Prints memory statistics (sizes of internal data structures, etc.). @@ -54,4 +54,9 @@ public interface GridCacheSharedManager <K, V> { * NOTE: this method is for testing and profiling purposes only. */ public void printMemoryStats(); + + /** + * + */ + public boolean restartOnDisconnect(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index 2cf7051..61dbc25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -101,14 +101,14 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ - @Override public final void onKernalStop(boolean cancel) { + @Override public final void onKernalStop(boolean cancel, boolean disconnected) { if (!starting.get()) // Ignoring attempt to stop manager that has never been started. return; - onKernalStop0(cancel); + onKernalStop0(cancel, disconnected); - if (log != null && log.isDebugEnabled()) + if (!disconnected && log != null && log.isDebugEnabled()) log.debug(kernalStopInfo()); } @@ -121,8 +121,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag /** * @param cancel Cancel flag. + * @param disconnected Disconnected flag. */ - protected void onKernalStop0(boolean cancel) { + protected void onKernalStop0(boolean cancel, boolean disconnected) { // No-op. } @@ -160,6 +161,11 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ + @Override public boolean restartOnDisconnect() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheSharedManagerAdapter.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 38a0d55..0369eb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -458,6 +458,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone()) return; + log.info("Init exchange: " + exchangeId()); + if (init.compareAndSet(false, true)) { if (isDone()) return; @@ -1024,6 +1026,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.exchange().onExchangeDone(this, err); if (super.onDone(res, err) && !dummy && !forcePreload) { + log.info("Finished exchange: " + exchangeId() + ", err=" + err); + if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0355bb3..969d7a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -180,13 +180,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { topVer.setIfGreater(startTopVer); - // Generate dummy discovery event for local node joining. - DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); - - assert discoEvt != null; - - assert discoEvt.topologyVersion() == startTopVer; - supplyPool.start(); demandPool.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 351d6cd..a7e3f4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -90,6 +90,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda public abstract GridDhtCacheAdapter<K, V> dht(); /** {@inheritDoc} */ + @Override public void disconnected() { + map = new GridCacheConcurrentMap(ctx, ctx.config().getNearConfiguration().getNearStartSize(), 0.75F); + } + + /** {@inheritDoc} */ @Override public boolean isNear() { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index d59a51d..39f6bd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -552,7 +552,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { // Creates task session with task name and task version. GridTaskSessionImpl ses = ctx.session().createTaskSession( sesId, - ctx.config().getNodeId(), + ctx.localNodeId(), taskName, dep, taskCls == null ? null : taskCls.getName(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 5e557bd..dd19203 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -58,9 +58,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Ignite instance. */ protected Ignite ignite; - /** Local node id. */ - protected UUID nodeId; - /** Grid instance name. */ protected String gridName; @@ -111,7 +108,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** {@inheritDoc} */ @Override public UUID getLocalNodeId() { - return nodeId; + return ignite.cluster().localNode().id(); } /** {@inheritDoc} */ @@ -201,10 +198,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement protected void injectResources(Ignite ignite) { this.ignite = ignite; - if (ignite != null) { - nodeId = ignite.configuration().getNodeId(); + if (ignite != null) gridName = ignite.name(); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index addf243d..5eaca21 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -248,7 +248,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Sending local node ID to newly accepted session: " + ses); - ses.send(nodeIdMsg); + ses.send(nodeIdMessage()); } } @@ -700,9 +700,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Address resolver. */ private AddressResolver addrRslvr; - /** Local node ID message. */ - private NodeIdMessage nodeIdMsg; - /** Received messages count. */ private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); @@ -739,10 +736,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + assert evt instanceof DiscoveryEvent : evt; + assert evt.type() == EVT_NODE_LEFT || + evt.type() == EVT_NODE_FAILED || + evt.type() == EVT_CLIENT_NODE_DISCONNECTED : evt; + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + for (GridCommunicationClient client : clients.values()) + client.forceClose(); + + IgniteCheckedException err = new IgniteCheckedException("Failed to connect to node, " + + "local node node disconnected."); - onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); + for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values()) + clientFut.onDone(err); + + recoveryDescs.clear(); + } + else + onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); } }; @@ -1237,8 +1249,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { - nodeIdMsg = new NodeIdMessage(getLocalNodeId()); - assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); @@ -1371,7 +1381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (boundTcpShmemPort > 0) spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP); - spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CLIENT_NODE_DISCONNECTED); ctxInitLatch.countDown(); } @@ -1666,10 +1676,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - UUID locNodeId = getLocalNodeId(); - - if (node.id().equals(locNodeId)) - notifyListener(locNodeId, msg, NOOP); + if (node.isLocal()) + notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; @@ -2208,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(), + HandshakeMessage msg = new HandshakeMessage(getSpiContext().localNode().id(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -2228,7 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(buf); } else - ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); + ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); if (recovery != null) { if (log.isDebugEnabled()) @@ -2355,6 +2363,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter getExceptionRegistry().onException(msg, e); } + /** + * @return Node ID message. + */ + private NodeIdMessage nodeIdMessage() { + return new NodeIdMessage(getSpiContext().localNode().id()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); @@ -2860,15 +2875,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { + UUID id = getSpiContext().localNode().id(); + + NodeIdMessage msg = new NodeIdMessage(id); + out.write(U.IGNITE_HEADER); out.write(NODE_ID_MSG_TYPE); - out.write(nodeIdMsg.nodeIdBytes); + out.write(msg.nodeIdBytes); out.flush(); if (log.isDebugEnabled()) - log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId=" - + rmtNodeId + ']'); + log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']'); } catch (IOException e) { throw new IgniteCheckedException("Failed to perform handshake.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 04276d2..4b1cfa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,9 +70,6 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private SocketReader sockReader; - /** */ - private boolean segmented; - /** Last message ID. */ private volatile IgniteUuid lastMsgId; @@ -325,7 +322,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { - if (segmented) + if (state == State.SEGMENTED) throw new IgniteException("Failed to send custom message: client is disconnected"); try { @@ -339,7 +336,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void failNode(UUID nodeId, @Nullable String warning) { - ClusterNode node = rmtNodes.get(nodeId); + TcpDiscoveryNode node = rmtNodes.get(nodeId); if (node != null) { TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), @@ -366,7 +363,8 @@ class ClientImpl extends TcpDiscoveryImpl { long startTime = U.currentTimeMillis(); // Marshal credentials for backward compatibility and security. - marshalCredentials(locNode); + if (!recon) + marshalCredentials(locNode); while (true) { if (Thread.currentThread().isInterrupted()) @@ -947,7 +945,10 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - assert !segmented; + assert state == ClientImpl.State.DISCONNECTED + || state == ClientImpl.State.CONNECTED + || state == ClientImpl.State.STARTING : + state; boolean success = false; @@ -1007,9 +1008,11 @@ class ClientImpl extends TcpDiscoveryImpl { } success = true; - } - return; + return; + } + else // TODO IGNITE-901 reuse socket. + return; } } else if (spi.ensured(msg)) { @@ -1090,45 +1093,33 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @SuppressWarnings("InfiniteLoopStatement") @Override protected void body() throws InterruptedException { + state = ClientImpl.State.STARTING; + spi.stats.onJoinStarted(); try { - final Socket sock = joinTopology(false, spi.joinTimeout); - - if (sock == null) { - joinError(new IgniteSpiException("Join process timed out.")); - - return; - } - - currSock = sock; - - sockWriter.setSocket(sock); - - if (spi.joinTimeout > 0) { - timer.schedule(new TimerTask() { - @Override public void run() { - if (joinLatch.getCount() > 0) - queue.add(JOIN_TIMEOUT); - } - }, spi.joinTimeout); - } - - sockReader.setSocket(sock, locNode.clientRouterNodeId()); + tryJoin(); while (true) { Object msg = queue.take(); if (msg == JOIN_TIMEOUT) { - if (joinLatch.getCount() > 0) { + if (state == ClientImpl.State.STARTING) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); break; } + else if (state == ClientImpl.State.DISCONNECTED) { + state = ClientImpl.State.SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } } else if (msg == SPI_STOP) { + state = ClientImpl.State.STOPPED; + assert spi.getSpiContext().isStopping(); if (currSock != null) { @@ -1147,7 +1138,7 @@ class ClientImpl extends TcpDiscoveryImpl { boolean join = joinLatch.getCount() > 0; - if (spi.getSpiContext().isStopping() || segmented) { + if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) { leaveLatch.countDown(); if (join) { @@ -1166,19 +1157,31 @@ class ClientImpl extends TcpDiscoveryImpl { } } else if (msg == SPI_RECONNECT_FAILED) { - if (!segmented) { - segmented = true; + reconnector.cancel(); + reconnector.join(); - reconnector.cancel(); - reconnector.join(); + reconnector = null; - notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); - } + state = ClientImpl.State.DISCONNECTED; + + nodeAdded = false; + + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + + UUID newId = UUID.randomUUID(); + + log.info("Change node id: " + newId); + + rmtNodes.clear(); + + locNode.onClientDisconnected(newId); + + tryJoin(); } else { TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (joinLatch.getCount() > 0) { + if (joinLatch.getCount() > 0) { // TODO IGNITE-901. IgniteSpiException err = null; if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) @@ -1214,6 +1217,44 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * @throws InterruptedException If interrupted. + */ + private void tryJoin() throws InterruptedException { + assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state; + + boolean join = state == ClientImpl.State.STARTING; + + final Socket sock = joinTopology(false, spi.joinTimeout); + + if (sock == null) { + if (join) + joinError(new IgniteSpiException("Join process timed out.")); + else { + state = ClientImpl.State.SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + + return; + } + + currSock = sock; + + sockWriter.setSocket(sock); + + if (spi.joinTimeout > 0) { + timer.schedule(new TimerTask() { + @Override public void run() { + if (joinLatch.getCount() > 0) + queue.add(JOIN_TIMEOUT); + } + }, spi.joinTimeout); + } + + sockReader.setSocket(sock, locNode.clientRouterNodeId()); + } + + /** * @param msg Message. */ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { @@ -1244,6 +1285,16 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingFinished(msg); } + private boolean nodeAdded; + + private boolean joining() { + return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED; + } + + private boolean disconnected() { + return state == ClientImpl.State.DISCONNECTED; + } + /** * @param msg Message. */ @@ -1256,12 +1307,15 @@ class ClientImpl extends TcpDiscoveryImpl { UUID newNodeId = node.id(); if (getLocalNodeId().equals(newNodeId)) { - if (joinLatch.getCount() > 0) { + if (joining()) { Collection<TcpDiscoveryNode> top = msg.topology(); if (top != null) { spi.gridStartTime = msg.gridStartTime(); + if (disconnected()) + rmtNodes.clear(); + for (TcpDiscoveryNode n : top) { if (n.order() > 0) n.visible(true); @@ -1271,6 +1325,8 @@ class ClientImpl extends TcpDiscoveryImpl { topHist.clear(); + nodeAdded = true; + if (msg.topologyHistory() != null) topHist.putAll(msg.topologyHistory()); } @@ -1308,7 +1364,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; if (getLocalNodeId().equals(msg.nodeId())) { - if (joinLatch.getCount() > 0) { + if (joining()) { Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData(); if (dataMap != null) { @@ -1323,7 +1379,14 @@ class ClientImpl extends TcpDiscoveryImpl { locNode.order(topVer); - notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg)); + Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg); + + if (disconnected()) + notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes); + + notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes); + + state = ClientImpl.State.CONNECTED; joinErr.set(null);; @@ -1437,7 +1500,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @return {@code True} if received node added message for local node. */ private boolean nodeAdded() { - return !topHist.isEmpty(); + return nodeAdded; } /** @@ -1582,7 +1645,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.verified() && joinLatch.getCount() == 0) { + if (msg.verified() && state == ClientImpl.State.CONNECTED) { DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { @@ -1718,4 +1781,18 @@ class ClientImpl extends TcpDiscoveryImpl { this.sock = sock; } } + + private volatile State state; + + private enum State { + STARTING, + + CONNECTED, + + DISCONNECTED, + + SEGMENTED, + + STOPPED + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index fa3e564..05f710d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -585,11 +585,11 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void failNode(UUID nodeId, @Nullable String warning) { - ClusterNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node != null) { TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), - node.id(), node.order()); + node.id(), node.internalOrder()); msg.warning(warning); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index ace917f..4cb0b8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -112,7 +112,7 @@ abstract class TcpDiscoveryImpl { * @return Local node ID. */ public UUID getLocalNodeId() { - return spi.getLocalNodeId(); + return spi.locNode.id(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 7663fe6..9446d2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -844,7 +844,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } locNode = new TcpDiscoveryNode( - getLocalNodeId(), + ignite.configuration().getNodeId(), addrs.get1(), addrs.get2(), srvPort, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 36ae39e..d0c3edf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -431,12 +431,29 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** + * @return Metrics provider. + */ + public DiscoveryMetricsProvider metricsProvider() { + return metricsProvider; + } + + /** * @param clientRouterNodeId Client router node ID. */ public void clientRouterNodeId(UUID clientRouterNodeId) { this.clientRouterNodeId = clientRouterNodeId; } + /** + * @param newId New node ID. + */ + public void onClientDisconnected(UUID newId) { + id = newId; + order = 0; + intOrder = 0; + visible = false; + } + /** {@inheritDoc} */ @Override public int compareTo(@Nullable TcpDiscoveryNode node) { if (node == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java index 7a88426..000782a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java @@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, registerMBean(gridName, this, FileSwapSpaceSpiMBean.class); - String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId(); + String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId(); try { dir = U.resolveWorkDirectory(path, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java index abc9109..5f2d2b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java @@ -104,8 +104,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { * Test kernal gateway that always return uninitialized user stack trace. */ private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() { - @Override public void lightCheck() throws IllegalStateException {} - @Override public void readLock() throws IllegalStateException {} @Override public void readLockAnyway() {} @@ -122,10 +120,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { @Override public void writeUnlock() {} - @Override public void addStopListener(Runnable lsnr) {} - - @Override public void removeStopListener(Runnable lsnr) {} - @Override public String userStackTrace() { return null; } @@ -133,5 +127,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { @Override public boolean tryWriteLock(long timeout) { return false; } + + @Override public void onDisconnected() { + // No-op. + } + + @Override public void onReconnected() { + // No-op. + } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java new file mode 100644 index 0000000..0512074 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.net.*; +import java.util.concurrent.*; + +/** + * + */ +public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + protected boolean clientMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + disco.setJoinTimeout(2 * 60_000); + + cfg.setDiscoverySpi(disco); + + if (clientMode) + cfg.setClientMode(true); + + return cfg; + } + + /** + * @return Number of server nodes started before tests. + */ + protected abstract int serverCount(); + + /** + * @param ignite Node. + * @return Discovery SPI. + */ + protected TestTcpDiscoverySpi spi(Ignite ignite) { + return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + int srvs = serverCount(); + + if (srvs > 0) + startGrids(srvs); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @param client Client. + * @return Server node client connected to. + */ + protected Ignite clientRouter(Ignite client) { + TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode(); + + assertTrue(node.isClient()); + assertNotNull(node.clientRouterNodeId()); + + Ignite srv = G.ignite(node.clientRouterNodeId()); + + assertNotNull(srv); + + return srv; + } + + /** + * @param fut Future. + * @throws Exception If failed. + */ + protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception { + assertNotNull(fut); + + if (fut.isDone()) + fail("Future completed with result: " + fut.get()); + } + + /** + * + */ + protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + volatile CountDownLatch writeLatch; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + CountDownLatch writeLatch0 = writeLatch; + + if (writeLatch0 != null) { + log.info("Block join request send: " + msg); + + U.await(writeLatch0); + } + } + + super.writeToSocket(sock, msg); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java new file mode 100644 index 0000000..164f6c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testIgniteBlockOnDisconnect() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + assertNotNull(client.cache(null)); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + final List<IgniteInternalFuture> futs = new ArrayList<>(); + + // TODO IGNITE-901 test block for others public API. + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + assertEquals(1, reconnectLatch.getCount()); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + return client.transactions(); + } + })); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + return client.cache(null); + } + })); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + return client.dataStreamer(null); + } + })); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + assertEquals(3, futs.size()); + + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); + + U.sleep(2000); + + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + IgniteTransactions txs = (IgniteTransactions)futs.get(0).get(); + + assertNotNull(txs); + + IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)futs.get(1).get(); + + assertNotNull(cache0); + + cache0.put(1, 1); + + assertEquals(1, cache0.get(1)); + + IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)futs.get(2).get(); + + streamer.addData(2, 2); + + streamer.close(); + + assertEquals(2, cache0.get(2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java new file mode 100644 index 0000000..5687010 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest { + /** */ + private final int SRV_CNT = 1; + + /** */ + private UUID nodeId; + + /** */ + private Map<IgnitePredicate<? extends Event>, int[]> lsnrs; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + cfg.setPeerClassLoadingEnabled(false); + + if (nodeId != null) { + cfg.setNodeId(nodeId); + + nodeId = null; + } + + if (lsnrs != null) { + cfg.setLocalEventListeners(lsnrs); + + lsnrs = null; + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(SRV_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + clientMode = true; + + Ignite client = startGrid(SRV_CNT); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>()); + + cache.put(1, 1); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>(); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + assertEquals(1, reconnectLatch.getCount()); + + blockPutRef.set(GridTestUtils.runAsync(new Callable() { + @Override + public Object call() throws Exception { + log.info("Start put."); + + cache.put(2, 2); + + log.info("Finish put."); + + return null; + } + })); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + IgniteInternalFuture putFut = blockPutRef.get(); + + assertNotDone(putFut); + + U.sleep(5000); + + assertNotDone(putFut); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + assertEquals(1, cache.get(1)); + + putFut.get(); + + assertEquals(2, cache.get(2)); + + cache.put(3, 3); + + assertEquals(3, cache.get(3)); + + this.clientMode = false; + + IgniteEx srv2 = startGrid(SRV_CNT + 1); + + Integer key = primaryKey(srv2.cache(null)); + + cache.put(key, 4); + + assertEquals(4, cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectExchangeInProgress() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id()); + + clientMode = false; + + startGrid(SRV_CNT + 1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + try { + srvCommSpi.stopBlock(true); + + fail(); + } + catch (IgniteException e) { + log.info("Expected error: " + e); + } + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("newCache"); + + ccfg.setCacheMode(REPLICATED); + + log.info("Start new cache."); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectInitialExchangeInProgress() throws Exception { + final UUID clientId = UUID.randomUUID(); + + Ignite srv = grid(0); + + final CountDownLatch joinLatch = new CountDownLatch(1); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) { + info("Client joined: " + evt); + + joinLatch.countDown(); + } + + return true; + } + }, EVT_NODE_JOINED); + + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId); + + clientMode = true; + + nodeId = clientId; + + lsnrs = new HashMap<>(); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + lsnrs.put(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, new int[]{EVT_CLIENT_NODE_RECONNECTED}); + + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override + public Ignite call() throws Exception { + try { + return startGrid(SRV_CNT); + } catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + }); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + assertTrue(joinLatch.await(5000, TimeUnit.MILLISECONDS)); + + U.sleep(1000); + + assertNotDone(fut); + + srvSpi.failNode(clientId, null); + + log.info("Wait reconnect."); + + assertTrue(reconnectLatch.await(10 * 60_000, TimeUnit.MILLISECONDS)); + + try { + srvCommSpi.stopBlock(true); + + fail(); + } + catch (IgniteException e) { + log.info("Expected error: " + e); + } + + Ignite client = fut.get(); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("newCache"); + + ccfg.setCacheMode(REPLICATED); + + log.info("Start new cache."); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectOperationInProgress() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) + info("Client disconnected: " + evt); + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) + info("Client reconnected: " + evt); + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + IgniteInClosure<IgniteCache<Object, Object>> putOp = new CI1<IgniteCache<Object, Object>>() { + @Override public void apply(IgniteCache<Object, Object> cache) { + cache.put(1, 1); + } + }; + + IgniteInClosure<IgniteCache<Object, Object>> getOp = new CI1<IgniteCache<Object, Object>>() { + @Override public void apply(IgniteCache<Object, Object> cache) { + cache.get(1); + } + }; + + int cnt = 0; + + for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { + CacheAtomicWriteOrderMode[] writeOrders = + atomicityMode == CacheAtomicityMode.ATOMIC ? CacheAtomicWriteOrderMode.values() : + new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK}; + + for (CacheAtomicWriteOrderMode writeOrder : writeOrders) { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + + ccfg.setAtomicWriteOrderMode(writeOrder); + + ccfg.setName("cache-" + cnt++); + + ccfg.setWriteSynchronizationMode(syncMode); + + if (syncMode != CacheWriteSynchronizationMode.FULL_ASYNC) { + Class<?> cls = (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC) ? + GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; + + log.info("Test cache put [atomicity=" + atomicityMode + + ", writeOrder=" + writeOrder + + ", syncMode=" + syncMode + ']'); + + checkOperationInProgressFails(client, ccfg, cls, putOp); + } + + log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); + + checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp); + } + } + } + } + + /** + * @param client Client. + * @param ccfg Cache configuration. + * @param msgToBlock Message to block. + * @param c Cache operation closure. + * @throws Exception If failed. + */ + private void checkOperationInProgressFails(IgniteEx client, + final CacheConfiguration<Object, Object> ccfg, + Class<?> msgToBlock, + final IgniteInClosure<IgniteCache<Object, Object>> c) + throws Exception + { + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); + + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + c.apply(cache); + + return null; + } + }); + + Thread.sleep(1000); + + assertNotDone(fut); + + log.info("Fail client: " + client.localNode().id()); + + srvSpi.failNode(client.localNode().id(), null); + + try { + fut.get(); + + fail(); + } + catch (IgniteCheckedException e) { + log.info("Expected error: " + e); + } + + srvCommSpi.stopBlock(false); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + client.destroyCache(cache.getName()); + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class<?> cls, UUID nodeId) { + synchronized (this) { + Set<UUID> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * @param snd Send messages flag. + */ + void stopBlock(boolean snd) { + synchronized (this) { + blockCls.clear(); + + if (snd) { + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + + blockedMsgs.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java new file mode 100644 index 0000000..deffd42 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 3; + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + long topVer = 4; + + IgniteCluster cluster = client.cluster(); + + cluster.nodeLocalMap().put("locMapKey", 10); + + Map<Integer, Integer> nodeCnt = new HashMap<>(); + + nodeCnt.put(1, 1); + nodeCnt.put(2, 2); + nodeCnt.put(3, 3); + nodeCnt.put(4, 4); + + for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { + Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + + assertEquals((int)e.getValue(), nodes.size()); + } + + ClusterNode locNode = cluster.localNode(); + + assertEquals(topVer, locNode.order()); + + TestTcpDiscoverySpi srvSpi = spi(clientRouter(client)); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) + info("Disconnected: " + evt); + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + + topVer += 2; // Client failed and rejoined. + + locNode = cluster.localNode(); + + assertEquals(topVer, locNode.order()); + assertEquals(topVer, cluster.topologyVersion()); + + nodeCnt.put(5, 3); + nodeCnt.put(6, 4); + + for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { + Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + + assertEquals((int)e.getValue(), nodes.size()); + } + + assertEquals(10, cluster.nodeLocalMap().get("locMapKey")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java index 19e40bf..7a2e8b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java @@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes Object msg0 = ((GridIoMessage)msg).message(); if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) { - info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id() + info("Sending message [locNodeId=" + ignite.cluster().localNode().id() + + ", destNodeId= " + destNode.id() + ", msg=" + msg + ']'); synchronized (msgCntMap) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index ec6a526..55fae9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -386,11 +386,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { - @Override public void apply(Socket sock) { + @Override + public void apply(Socket sock) { try { latch.await(); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } } @@ -744,11 +744,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(1, 1); ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() { - @Override public void apply(TcpDiscoveryAbstractMessage msg) { + @Override + public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } - catch (InterruptedException ignored) { + } catch (InterruptedException ignored) { Thread.interrupted(); } } @@ -778,7 +778,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { G.ignite("client-0").compute().broadcast(F.noop()); assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { + @Override + public boolean apply() { return checkMetrics(3, 3, 1); } }, 10000)); @@ -788,7 +789,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { G.ignite("server-0").compute().broadcast(F.noop()); assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { + @Override + public boolean apply() { return checkMetrics(3, 3, 2); } }, 10000)); @@ -886,7 +888,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { try { startClientNodes(1); - assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0") + assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0") .cluster().localNode()).clientRouterNodeId()); checkNodes(2, 1); @@ -1193,7 +1195,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientsPerSrv = CLIENTS; GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override + public Void call() throws Exception { Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); clientNodeIds.add(g.cluster().localNode().id()); @@ -1206,6 +1209,129 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testReconnectAfterFail() throws Exception { + reconnectAfterFail(false); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAfterFailTopologyChanged() throws Exception { + reconnectAfterFail(true); + } + + /** + * @param changeTop If {@code true} topology is changed after client disconnects. + * @throws Exception If failed. + */ + private void reconnectAfterFail(final boolean changeTop) throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final ClusterNode clientNode = client.cluster().localNode(); + + final UUID clientId = clientNode.id(); + + final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + assertEquals(2L, clientNode.order()); + + final CountDownLatch failLatch = new CountDownLatch(1); + + final CountDownLatch joinLatch = new CountDownLatch(1); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Server event: " + evt); + + DiscoveryEvent evt0 = (DiscoveryEvent)evt; + + if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) { + if (evt.type() == EVT_NODE_FAILED) + failLatch.countDown(); + } + else if (evt.type() == EVT_NODE_JOINED) { + TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode(); + + if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) { + assertEquals(changeTop ? 5L : 4L, node.order()); + + joinLatch.countDown(); + } + } + + return true; + } + }, EVT_NODE_FAILED, EVT_NODE_JOINED); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + info("Client event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + assertEquals(1, reconnectLatch.getCount()); + + disconnectLatch.countDown(); + + if (changeTop) + clientSpi.pauseAll(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + if (changeTop) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + + clientSpi.resumeAll(); + } + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + assertTrue(failLatch.await(5000, MILLISECONDS)); + assertTrue(joinLatch.await(5000, MILLISECONDS)); + + long topVer = changeTop ? 5L : 4L; + + assertEquals(topVer, client.cluster().localNode().order()); + + assertEquals(topVer, client.cluster().topologyVersion()); + + Collection<ClusterNode> clientTop = client.cluster().topology(topVer); + + assertEquals(changeTop ? 3 : 2, clientTop.size()); + + clientNodeIds.remove(clientId); + + clientNodeIds.add(client.cluster().localNode().id()); + + checkNodes(changeTop ? 2 : 1, 1); + } + + /** * @param clientIdx Client index. * @param srvIdx Server index. * @throws Exception In case of error. @@ -1401,7 +1527,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private void checkRemoteNodes(Ignite ignite, int expCnt) { Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); - assertEquals(expCnt, nodes.size()); + assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size()); for (ClusterNode node : nodes) { UUID id = node.id(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java new file mode 100644 index 0000000..7533a2c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.*; +import org.apache.ignite.internal.*; + +/** + * + */ +public class IgniteClientReconnectTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception In case of error. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite"); + + suite.addTestSuite(IgniteClientReconnectApiBlockTest.class); + suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); + suite.addTestSuite(IgniteClientReconnectCacheTest.class); + + return suite; + } +}