Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 92908b91c -> 1a2ed51a4
# ignite-901 WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/11176a3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/11176a3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/11176a3d Branch: refs/heads/ignite-901 Commit: 11176a3d0d9695b1709b6d6e2aafce5770f57f2a Parents: 401efd7 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 6 12:11:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 6 16:05:00 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 5 + .../ignite/internal/GridKernalContextImpl.java | 6 + .../processors/cache/GridCacheIoManager.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 47 ++- .../GridCachePartitionExchangeManager.java | 5 +- .../processors/cache/GridCacheProcessor.java | 30 +- .../cache/GridCacheSharedContext.java | 7 +- .../cache/GridCacheSharedManager.java | 10 +- .../cache/GridCacheSharedManagerAdapter.java | 15 +- .../distributed/GridCacheTxFinishSync.java | 46 ++ .../transactions/IgniteTransactionsImpl.java | 59 +-- .../cache/transactions/IgniteTxManager.java | 14 +- .../transactions/TransactionProxyImpl.java | 2 +- .../processors/cluster/ClusterProcessor.java | 11 + .../IgniteClientReconnectCacheTest.java | 420 +++++++++++++++++-- 15 files changed, 593 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index d6542f3..f4da333 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -557,4 +557,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). */ public boolean clientNode(); + + /** + * @return {@code True} if local node in disconnected state. + */ + public boolean clientDisconnected(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index c449ec9..a4edefb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -70,6 +70,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.internal.GridKernalState.*; import static org.apache.ignite.internal.IgniteComponentType.*; /** @@ -913,6 +914,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean clientDisconnected() { + return locNode.isClient() && gateway().getState() == DISCONNECTED; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 63abd8e..dfed416 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -170,7 +170,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @SuppressWarnings("BusyWait") - @Override protected void onKernalStop0(boolean cancel, boolean disconnected) { + @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_CACHE); for (Object ordTopic : orderedHandlers.keySet()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 3bd40a2..bc9a995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -216,7 +216,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStop0(boolean cancel, boolean disconnected) { + @Override public void onKernalStop0(boolean cancel) { cctx.gridEvents().removeLocalEventListener(discoLsnr); } @@ -293,22 +293,41 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * Cancels all client futures. - * - * @param reconnectFut Reconnect future is node disconnected, otherwise {@code null}. */ - public void cancelClientFutures(@Nullable IgniteFuture<?> reconnectFut) { - IgniteCheckedException e = reconnectFut == null ? - new IgniteCheckedException("Operation has been cancelled (node is stopping).") : - new IgniteClientDisconnectedCheckedException(reconnectFut, - "Operation has been cancelled (client node disconnected)."); + public void cancelClientFutures() { + cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping).")); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); + cancelClientFutures(err); + } + + /** + * @param err Error. + */ + private void cancelClientFutures(IgniteCheckedException err) { for (Collection<GridCacheFuture<?>> futures : futs.values()) { for (GridCacheFuture<?> future : futures) - ((GridFutureAdapter)future).onDone(e); + ((GridFutureAdapter)future).onDone(err); } for (GridCacheAtomicFuture<?> future : atomicFuts.values()) - ((GridFutureAdapter)future).onDone(e); + ((GridFutureAdapter)future).onDone(err); + } + + /** + * @param reconnectFut Reconnect future. + * @return Client disconnected exceprion. + */ + private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) { + if (reconnectFut == null) + reconnectFut = cctx.kernalContext().cluster().clientReconnectFuture(); + + return new IgniteClientDisconnectedCheckedException(reconnectFut, + "Operation has been cancelled (client node disconnected)."); } /** @@ -344,6 +363,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; + + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); } /** @@ -464,7 +486,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { fut.onNodeLeft(n.id()); } - // Just in case if future was complete before it was added. + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + + // Just in case if future was completed before it was added. if (fut.isDone()) removeFuture(fut); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bb69420..e091c67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -47,6 +47,7 @@ import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridKernalState.*; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*; @@ -323,14 +324,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel, boolean disconnected) { + @Override protected void onKernalStop0(boolean cancel) { cctx.gridEvents().removeLocalEventListener(discoLsnr); cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); - IgniteCheckedException err = disconnected ? + IgniteCheckedException err = cctx.kernalContext().gateway().getState() == DISCONNECTED ? new IgniteClientDisconnectedCheckedException(null, "Node disconnected: " + cctx.gridName()) : new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4fc02d5..b505e51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -911,7 +911,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { it.hasPrevious();) { GridCacheSharedManager<?, ?> mgr = it.previous(); - mgr.onKernalStop(cancel, false); + mgr.onKernalStop(cancel); } } @@ -926,20 +926,32 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : caches.values()) cache.context().gate().onDisconnected(reconnectFut); - sharedCtx.mvcc().cancelClientFutures(reconnectFut); - for (GridCacheAdapter cache : caches.values()) cache.disconnected(); - sharedCtx.onDisconnected(); + sharedCtx.onDisconnected(reconnectFut); } /** {@inheritDoc} */ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - cachesOnDisconnect = null; - for (GridCacheAdapter cache : caches.values()) { - boolean stopped = !registeredCaches.containsKey(maskNull(cache.name())); + String name = cache.name(); + + boolean stopped; + + boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); + + if (!sysCache) { + DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name)); + + assert oldDesc != null : "No descriptor for cache: " + name; + + DynamicCacheDescriptor newDesc = registeredCaches.get(maskNull(name)); + + stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); + } + else + stopped = false; cache.context().gate().reconnected(stopped); @@ -962,6 +974,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) mgr.onKernalStart(); + + cachesOnDisconnect = null; } /** @@ -2937,7 +2951,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Cancel all user operations. */ public void cancelUserOperations() { - sharedCtx.mvcc().cancelClientFutures(null); + sharedCtx.mvcc().cancelClientFutures(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 5a898b1..3c8cb47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.jetbrains.annotations.*; @@ -110,13 +111,15 @@ public class GridCacheSharedContext<K, V> { /** * @throws IgniteCheckedException If failed. */ - void onDisconnected() throws IgniteCheckedException { + void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { GridCacheSharedManager<?, ?> mgr = it.previous(); + mgr.onDisconnected(reconnectFut); + if (mgr.restartOnDisconnect()) - mgr.onKernalStop(true, true); + mgr.onKernalStop(true); } for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java index 5d27657..9739175 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.lang.*; /** * Cache manager shared across all caches. @@ -46,7 +47,12 @@ public interface GridCacheSharedManager <K, V> { /** * @param cancel Cancel flag. */ - public void onKernalStop(boolean cancel, boolean disconnected); + public void onKernalStop(boolean cancel); + + /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut); /** * Prints memory statistics (sizes of internal data structures, etc.). @@ -56,7 +62,7 @@ public interface GridCacheSharedManager <K, V> { public void printMemoryStats(); /** - * + * @return {@code True} if manager is restarted when client disconnects. */ public boolean restartOnDisconnect(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index 61dbc25..8029d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import java.util.concurrent.atomic.*; @@ -101,14 +102,14 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ - @Override public final void onKernalStop(boolean cancel, boolean disconnected) { + @Override public final void onKernalStop(boolean cancel) { if (!starting.get()) // Ignoring attempt to stop manager that has never been started. return; - onKernalStop0(cancel, disconnected); + onKernalStop0(cancel); - if (!disconnected && log != null && log.isDebugEnabled()) + if (log != null && log.isDebugEnabled()) log.debug(kernalStopInfo()); } @@ -121,9 +122,13 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag /** * @param cancel Cancel flag. - * @param disconnected Disconnected flag. */ - protected void onKernalStop0(boolean cancel, boolean disconnected) { + protected void onKernalStop0(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java index 2838838..0b351b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -81,6 +82,16 @@ public class GridCacheTxFinishSync<K, V> { } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + for (ThreadFinishSync threadSync : threadMap.values()) + threadSync.onDisconnected(reconnectFut); + + threadMap.clear(); + } + + /** * Callback invoked when finish response is received from remote node. * * @param nodeId Node ID response was received from. @@ -139,6 +150,11 @@ public class GridCacheTxFinishSync<K, V> { nodeMap.remove(nodeId); } + else if (cctx.kernalContext().clientDisconnected()) { + sync.onDisconnected(cctx.kernalContext().cluster().clientReconnectFuture()); + + nodeMap.remove(nodeId); + } } sync.onSend(); @@ -160,6 +176,16 @@ public class GridCacheTxFinishSync<K, V> { } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + for (TxFinishSync sync : nodeMap.values()) + sync.onDisconnected(reconnectFut); + + nodeMap.clear(); + } + + /** * @param nodeId Node ID response received from. */ public void onReceive(UUID nodeId) { @@ -288,5 +314,25 @@ public class GridCacheTxFinishSync<K, V> { } } } + + /** + * Client disconnected callback. + * + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + synchronized (this) { + nodeLeft = true; + + if (pendingFut != null) { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( + reconnectFut, + "Failed to wait for transaction synchronizer, client node disconnected: " + nodeId); + pendingFut.onDone(err); + + pendingFut = null; + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 99907e4..7d9bcf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -140,32 +140,39 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { @SuppressWarnings("unchecked") private IgniteInternalTx txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) { - TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); - - if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) - throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + - "'txSerializableEnabled' configuration property)"); - - IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); - - if (tx != null) - throw new IllegalStateException("Failed to start new transaction " + - "(current thread already has a transaction): " + tx); - - tx = cctx.tm().newTx( - false, - false, - sysCacheCtx, - concurrency, - isolation, - timeout, - true, - txSize - ); - - assert tx != null; - - return tx; + cctx.kernalContext().gateway().readLock(); + + try { + TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); + + if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) + throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + + "'txSerializableEnabled' configuration property)"); + + IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); + + if (tx != null) + throw new IllegalStateException("Failed to start new transaction " + + "(current thread already has a transaction): " + tx); + + tx = cctx.tm().newTx( + false, + false, + sysCacheCtx, + concurrency, + isolation, + timeout, + true, + txSize + ); + + assert tx != null; + + return tx; + } + finally { + cctx.kernalContext().gateway().readUnlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index b6c77f6..1747de9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -149,6 +149,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txHandler = new IgniteTxHandler(cctx); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) + rollbackTx(e.getValue()); + + txFinishSync.onDisconnected(reconnectFut); + } + /** * @return TX handler. */ @@ -764,11 +772,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); } - boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); + boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled(); // Clean up committed transactions queue. if (tx.pessimistic() && tx.local()) { - if (tx.enforceSerializable() && txSerializableEnabled) { + if (tx.enforceSerializable() && txSerEnabled) { for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) { IgniteInternalTx committedTx = it.next(); @@ -784,7 +792,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } - if (txSerializableEnabled && tx.optimistic() && tx.enforceSerializable()) { + if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) { Set<IgniteTxKey> readSet = tx.readSet(); Set<IgniteTxKey> writeSet = tx.writeSet(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 5099b42..9346e43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -89,7 +89,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza try { cctx.kernalContext().gateway().readLock(); } - catch (IllegalStateException e) { + catch (IllegalStateException | IgniteClientDisconnectedException e) { throw e; } catch (RuntimeException | Error e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 0ee00f1..1f5589f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cluster; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; /** * @@ -43,4 +45,13 @@ public class ClusterProcessor extends GridProcessorAdapter { public IgniteClusterImpl get() { return cluster; } + + /** + * @return Client reconnect future. + */ + public IgniteFuture<?> clientReconnectFuture() { + IgniteFuture<?> fut = cluster.clientReconnectFuture(); + + return fut != null ? fut : new IgniteFinishedFutureImpl<>(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11176a3d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index f9e2a9a..fdce8cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal; +import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; @@ -35,21 +36,30 @@ import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; import javax.cache.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * */ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest { /** */ - private final int SRV_CNT = 1; + private static final int SRV_CNT = 3; + + /** */ + private static final String STATIC_CACHE = "static-cache"; /** */ private UUID nodeId; @@ -77,6 +87,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac lsnrs = null; } + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(STATIC_CACHE); + + cfg.setCacheConfiguration(ccfg); + return cfg; } @@ -111,6 +127,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>()); + final IgniteCache<Object, Object> staticCache = client.cache(STATIC_CACHE); + + staticCache.put(1, 1); + + assertEquals(1, staticCache.get(1)); + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); ccfg.setName("nearCache"); @@ -182,7 +204,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac srvSpi.failNode(client.cluster().localNode().id(), null); - assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); IgniteInternalFuture putFut = blockPutRef.get(); @@ -196,12 +218,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac clientSpi.writeLatch.countDown(); - assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); checkCacheDiscoveryData(srv, client, null, true, true, false); checkCacheDiscoveryData(srv, client, "nearCache", true, true, true); + checkCacheDiscoveryData(srv, client, STATIC_CACHE, true, true, false); + assertEquals(1, cache.get(1)); putFut.get(); @@ -214,6 +238,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertNull(nearCache.localPeek(1)); + staticCache.put(10, 10); + + assertEquals(10, staticCache.get(10)); + this.clientMode = false; IgniteEx srv2 = startGrid(SRV_CNT + 1); @@ -227,6 +255,304 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac checkCacheDiscoveryData(srv2, client, null, true, true, false); checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true); + + checkCacheDiscoveryData(srv2, client, STATIC_CACHE, true, true, false); + + staticCache.put(20, 20); + + assertEquals(20, staticCache.get(20)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactions() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + Ignite srv = clientRouter(client); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); + + final IgniteTransactions txs = client.transactions(); + + final Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ); + + cache.put(1, 1); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + try { + tx.commit(); + + fail(); + } catch (IgniteClientDisconnectedException e) { + log.info("Expected error: " + e); + + assertNotNull(e.reconnectFuture()); + } + + try { + txs.txStart(); + + fail(); + } catch (IgniteClientDisconnectedException e) { + log.info("Expected error: " + e); + + assertNotNull(e.reconnectFuture()); + } + } + }); + + assertNull(txs.tx()); + + try (Transaction tx0 = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + tx0.commit(); + } + + try (Transaction tx0 = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(2, 2); + + assertEquals(2, cache.get(2)); + + tx0.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactionInProgress1() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); + + reconnectTransactionInProgress1(client, OPTIMISTIC, cache); + + reconnectTransactionInProgress1(client, PESSIMISTIC, cache); + } + + /** + * @param client Client. + * @param txConcurrency Transaction concurrency mode. + * @param cache Cache. + * @throws Exception If failed. + */ + private void reconnectTransactionInProgress1(IgniteEx client, + final TransactionConcurrency txConcurrency, + final IgniteCache<Object, Object> cache) + throws Exception + { + Ignite srv = clientRouter(client); + + final TestTcpDiscoverySpi clientSpi = spi(client); + final TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + final IgniteTransactions txs = client.transactions(); + + final CountDownLatch afterPut1 = new CountDownLatch(1); + + final CountDownLatch afterPut2 = new CountDownLatch(1); + + final CountDownLatch putFailed = new CountDownLatch(1); + + IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try { + log.info("Start tx1: " + txConcurrency); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(1, 1); + + afterPut1.countDown(); + + afterPut2.await(); + + cache.put(2, 2); + + fail(); + } + catch (CacheException e) { + log.info("Expected exception: " + e); + + putFailed.countDown(); + + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); + + e0.reconnectFuture().get(); + } + + log.info("Start tx2: " + txConcurrency); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(1, 1); + + cache.put(2, 2); + + tx.commit(); + } + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(3, 3); + + cache.put(4, 4); + + tx.commit(); + } + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + assertEquals(3, cache.get(3)); + assertEquals(4, cache.get(4)); + + cache.removeAll(); + + return true; + } + catch (AssertionFailedError e) { + throw e; + } + catch (Throwable e) { + log.error("Unexpected error", e); + + fail("Unexpected error: " + e); + + return false; + } + } + }); + + assertTrue(afterPut1.await(5000, MILLISECONDS)); + + assertNotDone(fut); + + srvSpi.failNode(client.localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + afterPut2.countDown(); + + assertTrue(putFailed.await(5000, MILLISECONDS)); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertTrue(fut.get()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactionInProgress2() throws Exception { + clientMode = true; + + final IgniteEx client = startGrid(SRV_CNT); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, OPTIMISTIC, 1); + + txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, PESSIMISTIC, 2); + + txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, OPTIMISTIC, 3); + + txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, PESSIMISTIC, 4); + + txInProgressFails(client, ccfg, GridNearLockResponse.class, PESSIMISTIC, 5); + } + + /** + * @param client Client. + * @param ccfg Cache configuration. + * @param msgToBlock Message to block. + * @param txConcurrency Transaction concurrency mode. + * @param key Key. + * @throws Exception If failed. + */ + private void txInProgressFails(final IgniteEx client, + final CacheConfiguration<Object, Object> ccfg, + Class<?> msgToBlock, + final TransactionConcurrency txConcurrency, + final Integer key) throws Exception { + log.info("Test tx failure [msg=" + msgToBlock + ", txMode=" + txConcurrency + ", key=" + key + ']'); + + checkOperationInProgressFails(client, ccfg, msgToBlock, + new CI1<IgniteCache<Object, Object>>() { + @Override public void apply(IgniteCache<Object, Object> cache) { + try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) { + log.info("Put1: " + key); + + cache.put(key, key); + + Integer key2 = key + 1; + + log.info("Put2: " + key2); + + cache.put(key2, key2); + + log.info("Commit [key1=" + key + ", key2=" + key2 + ']'); + + tx.commit(); + } + } + } + ); + + IgniteCache<Object, Object> cache = client.cache(ccfg.getName()); + + assertEquals(key, cache.get(key)); } /** @@ -241,9 +567,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac TestTcpDiscoverySpi srvSpi = spi(srv); - TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); - srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id()); + coordCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id()); clientMode = false; @@ -265,10 +591,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac srvSpi.failNode(client.cluster().localNode().id(), null); - assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); try { - srvCommSpi.stopBlock(true); + coordCommSpi.stopBlock(true); fail(); } @@ -352,7 +678,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac TestTcpDiscoverySpi srvSpi = spi(srv); - assertTrue(joinLatch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(joinLatch.await(5000, MILLISECONDS)); U.sleep(1000); @@ -362,7 +688,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Wait reconnect."); - assertTrue(reconnectLatch.await(10 * 60_000, TimeUnit.MILLISECONDS)); + assertTrue(reconnectLatch.await(10 * 60_000, MILLISECONDS)); try { srvCommSpi.stopBlock(true); @@ -425,7 +751,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { CacheAtomicWriteOrderMode[] writeOrders = - atomicityMode == CacheAtomicityMode.ATOMIC ? CacheAtomicWriteOrderMode.values() : + atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() : new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK}; for (CacheAtomicWriteOrderMode writeOrder : writeOrders) { @@ -440,8 +766,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ccfg.setWriteSynchronizationMode(syncMode); - if (syncMode != CacheWriteSynchronizationMode.FULL_ASYNC) { - Class<?> cls = (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC) ? + if (syncMode != FULL_ASYNC) { + Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ? GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; log.info("Test cache put [atomicity=" + atomicityMode + @@ -449,11 +775,15 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ", syncMode=" + syncMode + ']'); checkOperationInProgressFails(client, ccfg, cls, putOp); + + client.destroyCache(ccfg.getName()); } log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp); + + client.destroyCache(ccfg.getName()); } } } @@ -510,12 +840,18 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + assertEquals(ATOMIC, + clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + reconnectClientNode(client, srv, new Runnable() { - @Override - public void run() { + @Override public void run() { srv.destroyCache(null); - srv.getOrCreateCache(new CacheConfiguration<>()); + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + + srv.getOrCreateCache(ccfg); } }); @@ -524,6 +860,19 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac return clientCache.get(1); } }, IllegalStateException.class, null); + + checkCacheDiscoveryData(srv, client, null, true, false, false); + + IgniteCache<Object, Object> clientCache0 = client.cache(null); + + checkCacheDiscoveryData(srv, client, null, true, true, false); + + assertEquals(TRANSACTIONAL, + clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + clientCache0.put(1, 1); + + assertEquals(1, clientCache0.get(1)); } /** @@ -545,14 +894,25 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); - TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + for (int i = 0; i < SRV_CNT; i++) { + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi(); - srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); + srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); + } IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { + IgniteClientDisconnectedException e0 = null; + try { c.apply(cache); + + fail(); + } + catch (IgniteClientDisconnectedException e) { + log.info("Expected exception: " + e); + + e0 = e; } catch (CacheException e) { log.info("Expected exception: " + e); @@ -560,11 +920,16 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException); - IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); - - e0.reconnectFuture().get(); + e0 = (IgniteClientDisconnectedException)e.getCause(); } + assertNotNull(e0); + assertNotNull(e0.reconnectFuture()); + + e0.reconnectFuture().get(); + + c.apply(cache); + return null; } }); @@ -579,13 +944,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac fut.get(); - srvCommSpi.stopBlock(false); + for (int i = 0; i < SRV_CNT; i++) + ((TestCommunicationSpi)grid(i).configuration().getCommunicationSpi()).stopBlock(false); cache.put(1, 1); assertEquals(1, cache.get(1)); - - client.destroyCache(cache.getName()); } /** @@ -627,8 +991,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); if (cacheExists) { - assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); - assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + if (clientCache || clientNear) { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + else { + assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } } else { assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty());