Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 74447a2ec -> 39547027e


IGNITE-45 - Fixed assertions from example.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be6cfe3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be6cfe3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be6cfe3e

Branch: refs/heads/ignite-45
Commit: be6cfe3ec6560ef2fe6c6c6af40c3015689388df
Parents: 58f0ec9
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Thu Mar 19 12:34:03 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Thu Mar 19 12:34:03 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../discovery/GridDiscoveryManager.java         | 114 ++++++++++++------
 .../dht/GridClientPartitionTopology.java        |   7 +-
 .../dht/GridDhtPartitionTopology.java           |   5 +
 .../dht/GridDhtPartitionTopologyImpl.java       |   7 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  12 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   9 ++
 .../colocated/GridDhtColocatedLockFuture.java   |   7 ++
 .../distributed/near/GridNearLockFuture.java    |   7 ++
 .../near/GridNearTxPrepareFuture.java           |  13 ++
 .../ignite/internal/util/nio/GridNioServer.java | 120 +++++++++----------
 11 files changed, 204 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index db6e535..c553f0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2279,8 +2279,10 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             try {
                 ctx.cache().dynamicStartCache(cacheCfg, null).get();
             }
-            catch (IgniteCacheExistsException ignore) {
-                // Ignore the error if cache already exists.
+            catch (IgniteCheckedException e) {
+                // Ignore error if cache exists.
+                if (!e.hasCause(IgniteCacheExistsException.class))
+                    throw e;
             }
 
             return ctx.cache().publicJCache(cacheCfg.getName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b818e34..7585008 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
@@ -126,9 +127,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Last segment check result. */
     private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
 
-    /** Discovery cache. */
-    private final AtomicReference<DiscoCache> discoCache = new 
AtomicReference<>();
-
     /** Topology cache history. */
     private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
         new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE, 
DISCOVERY_HISTORY_SIZE, 0.7f, 1);
@@ -137,7 +135,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     private volatile Map<Long, Collection<ClusterNode>> topHist = new 
HashMap<>();
 
     /** Topology version. */
-    private final AtomicReference<AffinityTopologyVersion> topVer = new 
AtomicReference<>(AffinityTopologyVersion.ZERO);
+    private final AtomicReference<Snapshot> topSnap =
+        new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, 
null));
 
     /** Minor topology version. */
     private int minorTopVer;
@@ -384,7 +383,12 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     DiscoCache cache = new DiscoCache(locNode, 
F.view(topSnapshot, F.remoteNodes(locNode.id())));
 
                     discoCacheHist.put(nextTopVer, cache);
-                    discoCache.set(cache);
+
+                    boolean set = updateTopologyVersionIfGreater(nextTopVer, 
cache);
+
+                    assert set || topVer == 0 : "Topology version has not been 
updated [this.topVer=" +
+                        topSnap + ", topVer=" + topVer + ", node=" + node +
+                        ", evt=" + U.gridEventName(type) + ']';
                 }
 
                 // If this is a local join event, just save it and do not 
notify listeners.
@@ -407,15 +411,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     return;
                 }
 
-                if (topVer > 0 && (type == EVT_NODE_JOINED || type == 
EVT_NODE_FAILED || type == EVT_NODE_LEFT ||
-                    type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
-                    boolean set = updateTopologyVersionIfGreater(nextTopVer);
-
-                    assert set : "Topology version has not been updated 
[this.topVer=" +
-                        GridDiscoveryManager.this.topVer + ", topVer=" + 
topVer + ", node=" + node +
-                        ", evt=" + U.gridEventName(type) + ']';
-                }
-
                 discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data);
             }
         });
@@ -470,11 +465,12 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             segChkThread.start();
         }
 
-        checkAttributes(discoCache().remoteNodes());
-
         locNode = spi.getLocalNode();
 
-        updateTopologyVersionIfGreater(new 
AffinityTopologyVersion(locNode.order()));
+        updateTopologyVersionIfGreater(new 
AffinityTopologyVersion(locNode.order()), new DiscoCache(localNode(),
+            getSpi().getRemoteNodes()));
+
+        checkAttributes(discoCache().remoteNodes());
 
         // Start discovery worker.
         new IgniteThread(discoWrk).start();
@@ -785,7 +781,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * Prints the latest topology info into log taking into account 
logging/verbosity settings.
      */
     public void ackTopology() {
-        ackTopology(topVer.get().topologyVersion(), false);
+        ackTopology(topSnap.get().topVer.topologyVersion(), false);
     }
 
     /**
@@ -871,7 +867,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      */
     private String topologySnapshotMessage(int rmtNodesNum, int totalCpus, 
double heap) {
         return PREFIX + " [" +
-            (discoOrdered ? "ver=" + topVer.get().topologyVersion() + ", " : 
"") +
+            (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + 
", " : "") +
             "nodes=" + (rmtNodesNum + 1) +
             ", CPUs=" + totalCpus +
             ", heap=" + heap + "GB" +
@@ -937,10 +933,14 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         // Refresh disco cache if some node died.
         if (!alive) {
             while (true) {
-                DiscoCache c = discoCache();
+                Snapshot snap = topSnap.get();
+                DiscoCache c = snap.discoCache;
+
+                if (c == null)
+                    return false;
 
                 if (c.node(nodeId) != null) {
-                    if (discoCache.compareAndSet(c, null))
+                    if (topSnap.compareAndSet(snap, null))
                         break;
                 }
                 else
@@ -1049,14 +1049,27 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @return Discovery collection cache.
      */
     public DiscoCache discoCache() {
-        DiscoCache cur;
+        Snapshot cur;
 
-        while ((cur = discoCache.get()) == null)
+        while ((cur = topSnap.get()) == null) {
             // Wrap the SPI collection to avoid possible floating collection.
-            if (discoCache.compareAndSet(null, cur = new 
DiscoCache(localNode(), getSpi().getRemoteNodes())))
-                return cur;
+            if (topSnap.compareAndSet(null, cur = new Snapshot(
+                AffinityTopologyVersion.ZERO,
+                new DiscoCache(localNode(), getSpi().getRemoteNodes())))) {
+                return cur.discoCache;
+            }
+        }
+
+        return cur.discoCache;
+    }
 
-        return cur;
+    /**
+     * Gets discovery collection cache from SPI safely guarding against 
"floating" collections.
+     *
+     * @return Discovery collection cache.
+     */
+    public DiscoCache discoCache(AffinityTopologyVersion topVer) {
+        return discoCacheHist.get(topVer);
     }
 
     /** @return All non-daemon remote nodes in topology. */
@@ -1248,8 +1261,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @return Discovery cache.
      */
     private DiscoCache resolveDiscoCache(@Nullable String cacheName, 
AffinityTopologyVersion topVer) {
-        DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || 
topVer.equals(this.topVer.get()) ?
-            discoCache() : discoCacheHist.get(topVer);
+        Snapshot snap = topSnap.get();
+
+        DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || 
topVer.equals(snap.topVer) ?
+            snap.discoCache : discoCacheHist.get(topVer);
 
         if (cache == null) {
             // Find the eldest acceptable discovery cache.
@@ -1296,14 +1311,14 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
     /** @return Topology version. */
     public long topologyVersion() {
-        return topVer.get().topologyVersion();
+        return topSnap.get().topVer.topologyVersion();
     }
 
     /**
      * @return Topology version.
      */
     public AffinityTopologyVersion topologyVersionEx() {
-        return topVer.get();
+        return topSnap.get().topVer;
     }
 
     /** @return Event that represents a local node joined to topology. */
@@ -1338,12 +1353,12 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @param updated Updated topology version.
      * @return {@code True} if topology was updated.
      */
-    private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion 
updated) {
+    private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion 
updated, DiscoCache discoCache) {
         while (true) {
-            AffinityTopologyVersion cur = topVer.get();
+            Snapshot cur = topSnap.get();
 
-            if (updated.compareTo(cur) > 0) {
-                if (topVer.compareAndSet(cur, updated))
+            if (updated.compareTo(cur.topVer) >= 0) {
+                if (topSnap.compareAndSet(cur, new Snapshot(updated, 
discoCache)))
                     return true;
             }
             else
@@ -1887,6 +1902,27 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
     }
 
+    /**
+     *
+     */
+    private static class Snapshot {
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final DiscoCache discoCache;
+
+        /**
+         * @param topVer Topology version.
+         * @param discoCache Disco cache.
+         */
+        private Snapshot(AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            this.topVer = topVer;
+            this.discoCache = discoCache;
+        }
+    }
+
     /** Cache for discovery collections. */
     private class DiscoCache {
         /** Remote nodes. */
@@ -1896,21 +1932,27 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         private final List<ClusterNode> allNodes;
 
         /** All nodes with at least one cache configured. */
+        @GridToStringInclude
         private final Collection<ClusterNode> allNodesWithCaches;
 
         /** All nodes with at least one cache configured. */
+        @GridToStringInclude
         private final Collection<ClusterNode> rmtNodesWithCaches;
 
         /** Cache nodes by cache name. */
+        @GridToStringInclude
         private final Map<String, Collection<ClusterNode>> allCacheNodes;
 
         /** Remote cache nodes by cache name. */
+        @GridToStringInclude
         private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
 
         /** Cache nodes by cache name. */
+        @GridToStringInclude
         private final Map<String, Collection<ClusterNode>> affCacheNodes;
 
         /** Caches where at least one node has near cache enabled. */
+        @GridToStringInclude
         private final Set<String> nearEnabledCaches;
 
         /** Nodes grouped by version. */
@@ -1977,8 +2019,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 new HashMap<>(allNodes.size(), 1.0f);
             Map<String, Collection<ClusterNode>> dhtNodesMap =
                 new HashMap<>(allNodes.size(), 1.0f);
-            Collection<ClusterNode> nodesWithCaches = new 
ArrayList<>(allNodes.size());
-            Collection<ClusterNode> rmtNodesWithCaches = new 
ArrayList<>(allNodes.size());
+            Collection<ClusterNode> nodesWithCaches = new 
HashSet<>(allNodes.size());
+            Collection<ClusterNode> rmtNodesWithCaches = new 
HashSet<>(allNodes.size());
 
             aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 
1.0f);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index d39d880..d248f5f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -66,7 +66,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** */
-    private boolean stopping;
+    private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will 
be ready to use. */
     private GridDhtTopologyFuture topReadyFut;
@@ -187,6 +187,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopping() {
+        return stopping;
+    }
+
+    /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture 
exchFut) {
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 21d46b3..ac4b36a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -70,6 +70,11 @@ public interface GridDhtPartitionTopology {
     public GridDhtTopologyFuture topologyVersionFuture();
 
     /**
+     * @return {@code True} if cache is being stopped.
+     */
+    public boolean stopping();
+
+    /**
      * Pre-initializes this topology.
      *
      * @param exchFut Exchange future.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 60e9b70..9eb5cc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -69,7 +69,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements 
GridDhtPartitionTopology {
     private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** */
-    private boolean stopping;
+    private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will 
be ready to use. */
     private GridDhtTopologyFuture topReadyFut;
@@ -210,6 +210,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopping() {
+        return stopping;
+    }
+
+    /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
         waitForRent();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dadb3a5..93754a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1069,6 +1069,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 topology().readLock();
 
                 try {
+                    if (topology().stopping()) {
+                        res.addFailedKeys(keys, new 
IgniteCheckedException("Failed to perform cache operation " +
+                            "(cache is stopped): " + name()));
+
+                        completionCb.apply(req, res);
+
+                        return;
+                    }
+
                     // Do not check topology version for CLOCK versioning since
                     // partition exchange will wait for near update future.
                     if 
(topology().topologyVersion().equals(req.topologyVersion()) ||
@@ -2351,7 +2360,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             Collection<ClusterNode> nodes = 
ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
 
             // We are on primary node for some key.
-            assert !nodes.isEmpty();
+            assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + 
name() + ", topVer=" + topVer +
+                ctx.kernalContext().discovery().discoCache(topVer) + ']';
 
             if (nodes.size() == 1) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 77e07a4..ec75448 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -414,6 +414,15 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         AffinityTopologyVersion topVer = null;
 
         try {
+            if (cache.topology().stopping()) {
+                futVer = 
cctx.versions().next(cctx.affinity().affinityTopologyVersion());
+
+                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                    cache.name()));
+
+                return;
+            }
+
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
             if (fut.isDone()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 08045bc..c756998 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -544,6 +544,13 @@ public final class GridDhtColocatedLockFuture<K, V> 
extends GridCompoundIdentity
         cctx.topology().readLock();
 
         try {
+            if (cctx.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                    cctx.name()));
+
+                return;
+            }
+
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
             if (fut.isDone()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 1dfeb45..3354e81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -676,6 +676,13 @@ public final class GridNearLockFuture<K, V> extends 
GridCompoundIdentityFuture<B
         cctx.topology().readLock();
 
         try {
+            if (cctx.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                    cctx.name()));
+
+                return;
+            }
+
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
             if (fut.isDone()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 5d4daf4..e9f25ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -311,6 +311,12 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
             GridDhtTopologyFuture topFut = topologyReadLock();
 
             try {
+                if (topFut == null) {
+                    assert isDone();
+
+                    return;
+                }
+
                 if (topFut.isDone()) {
                     try {
                         if (!tx.state(PREPARING)) {
@@ -386,6 +392,13 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
         nonLocCtx.topology().readLock();
 
+        if (nonLocCtx.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                nonLocCtx.name()));
+
+            return null;
+        }
+
         return nonLocCtx.topology().topologyVersionFuture();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 57a3aae..2c309fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1025,97 +1025,93 @@ public class GridNioServer<T> {
             if (writer == null)
                 ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
 
-            List<NioOperationFuture<?>> doneFuts = null;
-
-            while (true) {
-                if (req == null) {
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+            if (req == null) {
+                req = (NioOperationFuture<?>)ses.pollFuture();
 
-                    if (req == null && buf.position() == 0) {
-                        key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
+                if (req == null && buf.position() == 0) {
+                    key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
 
-                        break;
-                    }
+                    return;
                 }
+            }
 
-                Message msg;
-                boolean finished = false;
+            Message msg;
+            boolean finished = false;
 
-                if (req != null) {
-                    msg = req.directMessage();
+            if (req != null) {
+                msg = req.directMessage();
 
-                    assert msg != null;
+                assert msg != null;
 
-                    finished = msg.writeTo(buf, writer);
+                finished = msg.writeTo(buf, writer);
 
-                    if (finished)
-                        writer.reset();
-                }
+                if (finished)
+                    writer.reset();
+            }
 
-                // Fill up as many messages as possible to write buffer.
-                while (finished) {
-                    if (doneFuts == null)
-                        doneFuts = new ArrayList<>();
+            // Fill up as many messages as possible to write buffer.
+            List<NioOperationFuture<?>> doneFuts = null;
 
-                    doneFuts.add(req);
+            while (finished) {
+                if (doneFuts == null)
+                    doneFuts = new ArrayList<>();
 
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                doneFuts.add(req);
 
-                    if (req == null)
-                        break;
+                req = (NioOperationFuture<?>)ses.pollFuture();
 
-                    msg = req.directMessage();
+                if (req == null)
+                    break;
 
-                    assert msg != null;
+                msg = req.directMessage();
 
-                    finished = msg.writeTo(buf, writer);
+                assert msg != null;
 
-                    if (finished)
-                        writer.reset();
-                }
+                finished = msg.writeTo(buf, writer);
 
-                buf.flip();
+                if (finished)
+                    writer.reset();
+            }
 
-                assert buf.hasRemaining();
+            buf.flip();
 
-                if (!skipWrite) {
-                    int cnt = sockCh.write(buf);
+            assert buf.hasRemaining();
 
-                    if (!F.isEmpty(doneFuts)) {
-                        for (int i = 0; i < doneFuts.size(); i++)
-                            doneFuts.get(i).onDone();
+            if (!skipWrite) {
+                int cnt = sockCh.write(buf);
 
-                        doneFuts.clear();
-                    }
+                if (!F.isEmpty(doneFuts)) {
+                    for (int i = 0; i < doneFuts.size(); i++)
+                        doneFuts.get(i).onDone();
 
-                    if (log.isTraceEnabled())
-                        log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + 
cnt + ']');
+                    doneFuts.clear();
+                }
 
-                    if (metricsLsnr != null)
-                        metricsLsnr.onBytesSent(cnt);
+                if (log.isTraceEnabled())
+                    log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt 
+ ']');
 
-                    ses.bytesSent(cnt);
+                if (metricsLsnr != null)
+                    metricsLsnr.onBytesSent(cnt);
+
+                ses.bytesSent(cnt);
+            }
+            else {
+                // For test purposes only (skipWrite is set to true in tests 
only).
+                try {
+                    U.sleep(50);
                 }
-                else {
-                    // For test purposes only (skipWrite is set to true in 
tests only).
-                    try {
-                        U.sleep(50);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        throw new IOException("Thread has been interrupted.", 
e);
-                    }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IOException("Thread has been interrupted.", e);
                 }
+            }
 
-                if (buf.hasRemaining()) {
-                    buf.compact();
-
-                    ses.addMeta(NIO_OPERATION.ordinal(), req);
+            if (buf.hasRemaining() || !finished) {
+                buf.compact();
 
-                    break;
-                }
-                else
-                    buf.clear();
+                ses.addMeta(NIO_OPERATION.ordinal(), req);
             }
+            else
+                buf.clear();
         }
     }
 

Reply via email to