Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 74447a2ec -> 39547027e
IGNITE-45 - Fixed assertions from example. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be6cfe3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be6cfe3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be6cfe3e Branch: refs/heads/ignite-45 Commit: be6cfe3ec6560ef2fe6c6c6af40c3015689388df Parents: 58f0ec9 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Mar 19 12:34:03 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Mar 19 12:34:03 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 6 +- .../discovery/GridDiscoveryManager.java | 114 ++++++++++++------ .../dht/GridClientPartitionTopology.java | 7 +- .../dht/GridDhtPartitionTopology.java | 5 + .../dht/GridDhtPartitionTopologyImpl.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 12 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 9 ++ .../colocated/GridDhtColocatedLockFuture.java | 7 ++ .../distributed/near/GridNearLockFuture.java | 7 ++ .../near/GridNearTxPrepareFuture.java | 13 ++ .../ignite/internal/util/nio/GridNioServer.java | 120 +++++++++---------- 11 files changed, 204 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index db6e535..c553f0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2279,8 +2279,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.cache().dynamicStartCache(cacheCfg, null).get(); } - catch (IgniteCacheExistsException ignore) { - // Ignore the error if cache already exists. + catch (IgniteCheckedException e) { + // Ignore error if cache exists. + if (!e.hasCause(IgniteCacheExistsException.class)) + throw e; } return ctx.cache().publicJCache(cacheCfg.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b818e34..7585008 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; @@ -126,9 +127,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Last segment check result. */ private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true); - /** Discovery cache. */ - private final AtomicReference<DiscoCache> discoCache = new AtomicReference<>(); - /** Topology cache history. */ private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist = new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE, DISCOVERY_HISTORY_SIZE, 0.7f, 1); @@ -137,7 +135,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>(); /** Topology version. */ - private final AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.ZERO); + private final AtomicReference<Snapshot> topSnap = + new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null)); /** Minor topology version. */ private int minorTopVer; @@ -384,7 +383,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(nextTopVer, cache); - discoCache.set(cache); + + boolean set = updateTopologyVersionIfGreater(nextTopVer, cache); + + assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" + + topSnap + ", topVer=" + topVer + ", node=" + node + + ", evt=" + U.gridEventName(type) + ']'; } // If this is a local join event, just save it and do not notify listeners. @@ -407,15 +411,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT || - type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) { - boolean set = updateTopologyVersionIfGreater(nextTopVer); - - assert set : "Topology version has not been updated [this.topVer=" + - GridDiscoveryManager.this.topVer + ", topVer=" + topVer + ", node=" + node + - ", evt=" + U.gridEventName(type) + ']'; - } - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data); } }); @@ -470,11 +465,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { segChkThread.start(); } - checkAttributes(discoCache().remoteNodes()); - locNode = spi.getLocalNode(); - updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order())); + updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), new DiscoCache(localNode(), + getSpi().getRemoteNodes())); + + checkAttributes(discoCache().remoteNodes()); // Start discovery worker. new IgniteThread(discoWrk).start(); @@ -785,7 +781,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * Prints the latest topology info into log taking into account logging/verbosity settings. */ public void ackTopology() { - ackTopology(topVer.get().topologyVersion(), false); + ackTopology(topSnap.get().topVer.topologyVersion(), false); } /** @@ -871,7 +867,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { */ private String topologySnapshotMessage(int rmtNodesNum, int totalCpus, double heap) { return PREFIX + " [" + - (discoOrdered ? "ver=" + topVer.get().topologyVersion() + ", " : "") + + (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") + "nodes=" + (rmtNodesNum + 1) + ", CPUs=" + totalCpus + ", heap=" + heap + "GB" + @@ -937,10 +933,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Refresh disco cache if some node died. if (!alive) { while (true) { - DiscoCache c = discoCache(); + Snapshot snap = topSnap.get(); + DiscoCache c = snap.discoCache; + + if (c == null) + return false; if (c.node(nodeId) != null) { - if (discoCache.compareAndSet(c, null)) + if (topSnap.compareAndSet(snap, null)) break; } else @@ -1049,14 +1049,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Discovery collection cache. */ public DiscoCache discoCache() { - DiscoCache cur; + Snapshot cur; - while ((cur = discoCache.get()) == null) + while ((cur = topSnap.get()) == null) { // Wrap the SPI collection to avoid possible floating collection. - if (discoCache.compareAndSet(null, cur = new DiscoCache(localNode(), getSpi().getRemoteNodes()))) - return cur; + if (topSnap.compareAndSet(null, cur = new Snapshot( + AffinityTopologyVersion.ZERO, + new DiscoCache(localNode(), getSpi().getRemoteNodes())))) { + return cur.discoCache; + } + } + + return cur.discoCache; + } - return cur; + /** + * Gets discovery collection cache from SPI safely guarding against "floating" collections. + * + * @return Discovery collection cache. + */ + public DiscoCache discoCache(AffinityTopologyVersion topVer) { + return discoCacheHist.get(topVer); } /** @return All non-daemon remote nodes in topology. */ @@ -1248,8 +1261,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Discovery cache. */ private DiscoCache resolveDiscoCache(@Nullable String cacheName, AffinityTopologyVersion topVer) { - DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(this.topVer.get()) ? - discoCache() : discoCacheHist.get(topVer); + Snapshot snap = topSnap.get(); + + DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ? + snap.discoCache : discoCacheHist.get(topVer); if (cache == null) { // Find the eldest acceptable discovery cache. @@ -1296,14 +1311,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @return Topology version. */ public long topologyVersion() { - return topVer.get().topologyVersion(); + return topSnap.get().topVer.topologyVersion(); } /** * @return Topology version. */ public AffinityTopologyVersion topologyVersionEx() { - return topVer.get(); + return topSnap.get().topVer; } /** @return Event that represents a local node joined to topology. */ @@ -1338,12 +1353,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param updated Updated topology version. * @return {@code True} if topology was updated. */ - private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated) { + private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache discoCache) { while (true) { - AffinityTopologyVersion cur = topVer.get(); + Snapshot cur = topSnap.get(); - if (updated.compareTo(cur) > 0) { - if (topVer.compareAndSet(cur, updated)) + if (updated.compareTo(cur.topVer) >= 0) { + if (topSnap.compareAndSet(cur, new Snapshot(updated, discoCache))) return true; } else @@ -1887,6 +1902,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + /** + * + */ + private static class Snapshot { + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final DiscoCache discoCache; + + /** + * @param topVer Topology version. + * @param discoCache Disco cache. + */ + private Snapshot(AffinityTopologyVersion topVer, + DiscoCache discoCache) { + this.topVer = topVer; + this.discoCache = discoCache; + } + } + /** Cache for discovery collections. */ private class DiscoCache { /** Remote nodes. */ @@ -1896,21 +1932,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final List<ClusterNode> allNodes; /** All nodes with at least one cache configured. */ + @GridToStringInclude private final Collection<ClusterNode> allNodesWithCaches; /** All nodes with at least one cache configured. */ + @GridToStringInclude private final Collection<ClusterNode> rmtNodesWithCaches; /** Cache nodes by cache name. */ + @GridToStringInclude private final Map<String, Collection<ClusterNode>> allCacheNodes; /** Remote cache nodes by cache name. */ + @GridToStringInclude private final Map<String, Collection<ClusterNode>> rmtCacheNodes; /** Cache nodes by cache name. */ + @GridToStringInclude private final Map<String, Collection<ClusterNode>> affCacheNodes; /** Caches where at least one node has near cache enabled. */ + @GridToStringInclude private final Set<String> nearEnabledCaches; /** Nodes grouped by version. */ @@ -1977,8 +2019,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { new HashMap<>(allNodes.size(), 1.0f); Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f); - Collection<ClusterNode> nodesWithCaches = new ArrayList<>(allNodes.size()); - Collection<ClusterNode> rmtNodesWithCaches = new ArrayList<>(allNodes.size()); + Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); + Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index d39d880..d248f5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -66,7 +66,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** */ - private boolean stopping; + private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; @@ -187,6 +187,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean stopping() { + return stopping; + } + + /** {@inheritDoc} */ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) { ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 21d46b3..ac4b36a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -70,6 +70,11 @@ public interface GridDhtPartitionTopology { public GridDhtTopologyFuture topologyVersionFuture(); /** + * @return {@code True} if cache is being stopped. + */ + public boolean stopping(); + + /** * Pre-initializes this topology. * * @param exchFut Exchange future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 60e9b70..9eb5cc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -69,7 +69,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** */ - private boolean stopping; + private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; @@ -210,6 +210,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public boolean stopping() { + return stopping; + } + + /** {@inheritDoc} */ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { waitForRent(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index dadb3a5..93754a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1069,6 +1069,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { topology().readLock(); try { + if (topology().stopping()) { + res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + + "(cache is stopped): " + name())); + + completionCb.apply(req, res); + + return; + } + // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. if (topology().topologyVersion().equals(req.topologyVersion()) || @@ -2351,7 +2360,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); // We are on primary node for some key. - assert !nodes.isEmpty(); + assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + + ctx.kernalContext().discovery().discoCache(topVer) + ']'; if (nodes.size() == 1) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 77e07a4..ec75448 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -414,6 +414,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> AffinityTopologyVersion topVer = null; try { + if (cache.topology().stopping()) { + futVer = cctx.versions().next(cctx.affinity().affinityTopologyVersion()); + + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); + + return; + } + GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 08045bc..c756998 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -544,6 +544,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity cctx.topology().readLock(); try { + if (cctx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cctx.name())); + + return; + } + GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 1dfeb45..3354e81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -676,6 +676,13 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B cctx.topology().readLock(); try { + if (cctx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cctx.name())); + + return; + } + GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 5d4daf4..e9f25ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -311,6 +311,12 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut GridDhtTopologyFuture topFut = topologyReadLock(); try { + if (topFut == null) { + assert isDone(); + + return; + } + if (topFut.isDone()) { try { if (!tx.state(PREPARING)) { @@ -386,6 +392,13 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut nonLocCtx.topology().readLock(); + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + return nonLocCtx.topology().topologyVersionFuture(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be6cfe3e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 57a3aae..2c309fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1025,97 +1025,93 @@ public class GridNioServer<T> { if (writer == null) ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer()); - List<NioOperationFuture<?>> doneFuts = null; - - while (true) { - if (req == null) { - req = (NioOperationFuture<?>)ses.pollFuture(); + if (req == null) { + req = (NioOperationFuture<?>)ses.pollFuture(); - if (req == null && buf.position() == 0) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (req == null && buf.position() == 0) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - break; - } + return; } + } - Message msg; - boolean finished = false; + Message msg; + boolean finished = false; - if (req != null) { - msg = req.directMessage(); + if (req != null) { + msg = req.directMessage(); - assert msg != null; + assert msg != null; - finished = msg.writeTo(buf, writer); + finished = msg.writeTo(buf, writer); - if (finished) - writer.reset(); - } + if (finished) + writer.reset(); + } - // Fill up as many messages as possible to write buffer. - while (finished) { - if (doneFuts == null) - doneFuts = new ArrayList<>(); + // Fill up as many messages as possible to write buffer. + List<NioOperationFuture<?>> doneFuts = null; - doneFuts.add(req); + while (finished) { + if (doneFuts == null) + doneFuts = new ArrayList<>(); - req = (NioOperationFuture<?>)ses.pollFuture(); + doneFuts.add(req); - if (req == null) - break; + req = (NioOperationFuture<?>)ses.pollFuture(); - msg = req.directMessage(); + if (req == null) + break; - assert msg != null; + msg = req.directMessage(); - finished = msg.writeTo(buf, writer); + assert msg != null; - if (finished) - writer.reset(); - } + finished = msg.writeTo(buf, writer); - buf.flip(); + if (finished) + writer.reset(); + } - assert buf.hasRemaining(); + buf.flip(); - if (!skipWrite) { - int cnt = sockCh.write(buf); + assert buf.hasRemaining(); - if (!F.isEmpty(doneFuts)) { - for (int i = 0; i < doneFuts.size(); i++) - doneFuts.get(i).onDone(); + if (!skipWrite) { + int cnt = sockCh.write(buf); - doneFuts.clear(); - } + if (!F.isEmpty(doneFuts)) { + for (int i = 0; i < doneFuts.size(); i++) + doneFuts.get(i).onDone(); - if (log.isTraceEnabled()) - log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); + doneFuts.clear(); + } - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (log.isTraceEnabled()) + log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); - ses.bytesSent(cnt); + if (metricsLsnr != null) + metricsLsnr.onBytesSent(cnt); + + ses.bytesSent(cnt); + } + else { + // For test purposes only (skipWrite is set to true in tests only). + try { + U.sleep(50); } - else { - // For test purposes only (skipWrite is set to true in tests only). - try { - U.sleep(50); - } - catch (IgniteInterruptedCheckedException e) { - throw new IOException("Thread has been interrupted.", e); - } + catch (IgniteInterruptedCheckedException e) { + throw new IOException("Thread has been interrupted.", e); } + } - if (buf.hasRemaining()) { - buf.compact(); - - ses.addMeta(NIO_OPERATION.ordinal(), req); + if (buf.hasRemaining() || !finished) { + buf.compact(); - break; - } - else - buf.clear(); + ses.addMeta(NIO_OPERATION.ordinal(), req); } + else + buf.clear(); } }