# Properly handle ClusterTopologyServerNotFoundException for retries
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2903a29e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2903a29e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2903a29e Branch: refs/heads/ignite-gg-9615-1 Commit: 2903a29e7a50802617872bfd0fcc3497c4c7785e Parents: 122a9db Author: sboikov <sboi...@gridgain.com> Authored: Fri Aug 14 16:22:25 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Aug 14 16:22:25 2015 +0300 ---------------------------------------------------------------------- .../CachePartialUpdateCheckedException.java | 29 +++++++++++- .../processors/cache/GridCacheAdapter.java | 50 ++++++++++++-------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 48 +++++++++++-------- .../near/GridNearOptimisticTxPrepareFuture.java | 2 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 1 - 5 files changed, 86 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java index 0272b7c..f430d12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; import java.util.*; @@ -32,6 +33,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { /** Failed keys. */ private final Collection<Object> failedKeys = new ArrayList<>(); + /** */ + private AffinityTopologyVersion topVer; + /** * @param msg Error message. */ @@ -50,13 +54,36 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { /** * @param failedKeys Failed keys. * @param err Error. + * @param topVer Topology version for failed update. */ - public void add(Collection<?> failedKeys, Throwable err) { + public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) { + if (topVer != null) { + AffinityTopologyVersion topVer0 = this.topVer; + + if (topVer0 == null || topVer.compareTo(topVer0) > 0) + this.topVer = topVer; + } + this.failedKeys.addAll(failedKeys); addSuppressed(err); } + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @param failedKeys Failed keys. + * @param err Error. + */ + public void add(Collection<?> failedKeys, Throwable err) { + add(failedKeys, err, null); + } + /** {@inheritDoc} */ @Override public String getMessage() { return super.getMessage() + ": " + failedKeys; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 91af352..992edd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3975,13 +3975,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) { - AffinityTopologyVersion topVer = tx.topologyVersion(); + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - assert topVer != null && topVer.topologyVersion() > 0 : tx; + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer != null && topVer.topologyVersion() > 0 : tx; - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); - continue; + continue; + } } throw e; @@ -4702,31 +4706,35 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } catch (IgniteCheckedException e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) { - IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - assert tx != null; + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; - AffinityTopologyVersion topVer = tx.topologyVersion(); + assert tx != null; - assert topVer != null && topVer.topologyVersion() > 0 : tx; + AffinityTopologyVersion topVer = tx.topologyVersion(); - IgniteInternalFuture<?> topFut = - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); + assert topVer != null && topVer.topologyVersion() > 0 : tx; - topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> topFut) { - try { - topFut.get(); + IgniteInternalFuture<?> topFut = + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); - execute(); - } - catch (IgniteCheckedException e) { - onDone(e); + topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> topFut) { + try { + topFut.get(); + + execute(); + } + catch (IgniteCheckedException e) { + onDone(e); + } } - } - }); + }); - return; + return; + } } onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 0498839..5dc5494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -275,6 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (singleNodeId.equals(nodeId)) { onDone(addFailedKeys( singleReq.keys(), + singleReq.topologyVersion(), new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId))); return true; @@ -286,8 +287,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { - addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + - "received: " + nodeId)); + addFailedKeys(req.keys(), + req.topologyVersion(), + new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); mappings.remove(nodeId); @@ -356,8 +358,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param failed Keys to remap. + * @param errTopVer Topology version for failed update. */ - private void remap(Collection<?> failed) { + private void remap(Collection<?> failed, AffinityTopologyVersion errTopVer) { + assert errTopVer != null; + GridCacheVersion futVer0 = futVer; if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null) @@ -409,15 +414,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridFutureAdapter<Void> fut0; - long nextTopVer; - synchronized (this) { mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); assert topVer != null && topVer.topologyVersion() > 0 : this; - nextTopVer = topVer.topologyVersion() + 1; - topVer = AffinityTopologyVersion.ZERO; fut0 = topCompleteFut; @@ -434,7 +435,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updVer = null; topLocked = false; - IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer); + IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(final IgniteInternalFuture<?> fut) { @@ -471,15 +472,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> X.hasCause(err, ClusterTopologyCheckedException.class) && storeFuture() && remapCnt.decrementAndGet() > 0) { + ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class); - CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class); + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class); - if (F.isEmpty(cause.failedKeys())) - cause.printStackTrace(); + assert cause != null && cause.topologyVersion() != null : err; - remap(cause.failedKeys()); + remap(cause.failedKeys(), cause.topologyVersion()); - return false; + return false; + } } if (super.onDone(retval, err)) { @@ -528,8 +531,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updateNear(singleReq, res); - if (res.error() != null) - onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error()); + if (res.error() != null) { + onDone(res.failedKeys() != null ? + addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error()); + } else { if (op == TRANSFORM) { if (ret != null) @@ -551,7 +556,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updateNear(req, res); if (res.error() != null) - addFailedKeys(req.keys(), res.error()); + addFailedKeys(req.keys(), req.topologyVersion(), res.error()); else { if (op == TRANSFORM) { assert !req.fastMap(); @@ -1048,7 +1053,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> onDone(new GridCacheReturn(cctx, true, null, true)); } catch (IgniteCheckedException e) { - onDone(addFailedKeys(req.keys(), e)); + onDone(addFailedKeys(req.keys(), req.topologyVersion(), e)); } } } @@ -1079,7 +1084,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } catch (IgniteCheckedException e) { - addFailedKeys(req.keys(), e); + addFailedKeys(req.keys(), req.topologyVersion(), e); removeMapping(req.nodeId()); } @@ -1135,10 +1140,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. * @param err Error cause. * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}. */ - private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, Throwable err) { + private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { CachePartialUpdateCheckedException err0 = this.err; if (err0 == null) @@ -1149,7 +1157,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> for (KeyCacheObject key : failedKeys) keys.add(key.value(cctx.cacheObjectContext(), false)); - err0.add(keys, err); + err0.add(keys, err, topVer); return err0; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 44b7997..2b86672 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -416,7 +416,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + "partition nodes left the grid): " + cacheCtx.name())); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 0a8f87c..ff948a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -5010,7 +5010,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - IgniteFuture fut = cache.future().chain(new IgniteClosure<IgniteFuture, Object>() { @Override public Object apply(IgniteFuture o) { return o.get();