Repository: incubator-ignite Updated Branches: refs/heads/ignite-621 415264e3e -> 3787a9d33
IGNITE-621 - Added automatic retries. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3787a9d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3787a9d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3787a9d3 Branch: refs/heads/ignite-621 Commit: 3787a9d3353c0c146141a79e3e87e1bbc5128031 Parents: 415264e Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri Jun 19 17:15:02 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri Jun 19 17:15:02 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 5 + .../processors/cache/CacheOperationContext.java | 44 +++++- .../processors/cache/GridCacheAdapter.java | 91 +++++++------ .../processors/cache/GridCacheProxyImpl.java | 10 +- .../processors/cache/IgniteCacheProxy.java | 36 ++++- .../dht/atomic/GridDhtAtomicCache.java | 18 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 87 ++++++++++-- .../IgniteCachePutRetryAbstractSelfTest.java | 134 +++++++++++++++++++ .../dht/IgniteCachePutRetryAtomicSelfTest.java | 34 +++++ ...gniteCachePutRetryTransactionalSelfTest.java | 35 +++++ 10 files changed, 422 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 2b97e55..c8d6d7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -106,6 +106,11 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public IgniteCache<K, V> withSkipStore(); /** + * @return Cache with no-retries behavior enabled. + */ + public IgniteCache<K, V> withNoRetries(); + + /** * Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes. * * @param p Optional predicate (may be {@code null}). If provided, will be used to http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index 34d2bf4..343a2f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -36,6 +36,10 @@ public class CacheOperationContext implements Serializable { @GridToStringInclude private final boolean skipStore; + /** No retries flag. */ + @GridToStringInclude + private final boolean noRetries; + /** Client ID which operates over this projection. */ private final UUID subjId; @@ -56,6 +60,8 @@ public class CacheOperationContext implements Serializable { keepPortable = false; expiryPlc = null; + + noRetries = false; } /** @@ -68,7 +74,8 @@ public class CacheOperationContext implements Serializable { boolean skipStore, @Nullable UUID subjId, boolean keepPortable, - @Nullable ExpiryPolicy expiryPlc) { + @Nullable ExpiryPolicy expiryPlc, + boolean noRetries) { this.skipStore = skipStore; this.subjId = subjId; @@ -76,6 +83,8 @@ public class CacheOperationContext implements Serializable { this.keepPortable = keepPortable; this.expiryPlc = expiryPlc; + + this.noRetries = noRetries; } /** @@ -95,7 +104,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, true, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -118,7 +128,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, keepPortable, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -139,7 +150,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, keepPortable, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -160,7 +172,29 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, true, - plc); + plc, + noRetries); + } + + /** + * @param noRetries No retries flag. + * @return Operation context. + */ + public CacheOperationContext setNoRetries(boolean noRetries) { + return new CacheOperationContext( + skipStore, + subjId, + keepPortable, + expiryPlc, + noRetries + ); + } + + /** + * @return No retries flag. + */ + public boolean noRetries() { + return noRetries; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7335d72..f993527 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** clearLocally() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; + /** Maximum number of retries when topology changes. */ + public static final int MAX_RETRIES = 100; + /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -363,7 +366,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { - CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null); + CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -375,14 +378,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { - CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null); + CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepPortable() { - CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null); + CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); } @@ -399,7 +402,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert !CU.isAtomicsCache(ctx.name()); assert !CU.isMarshallerCache(ctx.name()); - CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc); + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -2301,7 +2304,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Putx operation future. */ public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, - @Nullable final CacheEntryPredicate... filter) { + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3930,51 +3933,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = ctx.gridConfig().getTransactionConfiguration(); - tx = ctx.tm().newTx( - true, - op.single(), - ctx.systemTx() ? ctx : null, - OPTIMISTIC, - READ_COMMITTED, - tCfg.getDefaultTxTimeout(), - !ctx.skipStore(), - 0 - ); + CacheOperationContext opCtx = ctx.operationContextPerCall(); - assert tx != null; + int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES; - try { - T t = op.op(tx); + for (int i = 0; i < retries; i++) { + tx = ctx.tm().newTx( + true, + op.single(), + ctx.systemTx() ? ctx : null, + OPTIMISTIC, + READ_COMMITTED, + tCfg.getDefaultTxTimeout(), + !ctx.skipStore(), + 0 + ); - assert tx.done() : "Transaction is not done: " + tx; + assert tx != null; - return t; - } - catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | - IgniteTxRollbackCheckedException e) { - throw e; - } - catch (IgniteCheckedException e) { try { - tx.rollback(); + T t = op.op(tx); - e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " + - tx.xid(), e); + assert tx.done() : "Transaction is not done: " + tx; + + return t; + } + catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | + IgniteTxRollbackCheckedException e) { + throw e; } - catch (IgniteCheckedException | AssertionError | RuntimeException e1) { - U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1); + catch (IgniteCheckedException e) { + try { + tx.rollback(); + + e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " + + tx.xid(), e); + } + catch (IgniteCheckedException | AssertionError | RuntimeException e1) { + U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1); + + U.addLastCause(e, e1, log); + } + + if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) + continue; - U.addLastCause(e, e1, log); + throw e; } + finally { + ctx.tm().resetContext(); - throw e; + if (ctx.isNear()) + ctx.near().dht().context().tm().resetContext(); + } } - finally { - ctx.tm().resetContext(); - if (ctx.isNear()) - ctx.near().dht().context().tm().resetContext(); - } + // Should not happen. + throw new IgniteCheckedException("Failed to perform cache operation (maximum number of retries exceeded)."); } else return op.op(tx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 63ba242..cec8c53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -118,7 +118,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte CacheOperationContext prev = gate.enter(opCtx); try { - return opCtx != null ? opCtx.skipStore() : false; + return opCtx != null && opCtx.skipStore(); } finally { gate.leave(prev); @@ -198,7 +198,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null)); + opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false)); } /** {@inheritDoc} */ @@ -210,7 +210,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return this; return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null)); + opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false)); } finally { gate.leave(prev); @@ -224,7 +224,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)delegate, - opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null)); + opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null, false)); } /** {@inheritDoc} */ @@ -1515,7 +1515,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte try { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc)); + opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false)); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 48fd259..0ad2a9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -246,7 +246,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) : - new CacheOperationContext(false, null, false, plc); + new CacheOperationContext(false, null, false, plc, false); return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock); } @@ -261,6 +261,30 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withNoRetries() { + CacheOperationContext prev = onEnter(opCtx); + + try { + boolean noRetries = opCtx != null && opCtx.noRetries(); + + if (noRetries) + return this; + + CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) : + new CacheOperationContext(false, null, false, null, true); + + return new IgniteCacheProxy<>(ctx, + delegate, + opCtx0, + isAsync(), + lock); + } + finally { + onLeave(prev); + } + } + + /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { CacheOperationContext prev = onEnter(opCtx); @@ -1498,10 +1522,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { CacheOperationContext opCtx0 = new CacheOperationContext( - opCtx != null ? opCtx.skipStore() : false, + opCtx != null && opCtx.skipStore(), opCtx != null ? opCtx.subjectId() : null, true, - opCtx != null ? opCtx.expiry() : null); + opCtx != null ? opCtx.expiry() : null, + opCtx != null && opCtx.noRetries()); return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)delegate, @@ -1529,8 +1554,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheOperationContext opCtx0 = new CacheOperationContext(true, opCtx != null ? opCtx.subjectId() : null, - opCtx != null ? opCtx.isKeepPortable() : false, - opCtx != null ? opCtx.expiry() : null); + opCtx != null && opCtx.isKeepPortable(), + opCtx != null ? opCtx.expiry() : null, + opCtx != null && opCtx.noRetries()); return new IgniteCacheProxy<>(ctx, delegate, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8630421..2863ae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -767,11 +767,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filter, subjId, taskNameHash, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(waitTopFut); + updateFut.map(); return updateFut; } @@ -830,14 +832,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filter, subjId, taskNameHash, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + true); if (statsEnabled) updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(true); + updateFut.map(); return updateFut; } @@ -2273,9 +2277,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.filter(), req.subjectId(), req.taskNameHash(), - req.skipStore()); + req.skipStore(), + MAX_RETRIES, + true); - updateFut.map(true); + updateFut.map(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 07f5ecf..53150cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private final ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; + private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -123,7 +123,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private GridNearAtomicUpdateRequest singleReq; /** Raw return value flag. */ - private boolean rawRetval; + private final boolean rawRetval; /** Fast map flag. */ private final boolean fastMap; @@ -149,6 +149,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Skip store flag. */ private final boolean skipStore; + /** Wait for topology future flag. */ + private final boolean waitTopFut; + + /** Remap count. */ + private AtomicInteger remapCnt; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -183,7 +189,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> final CacheEntryPredicate[] filter, UUID subjId, int taskNameHash, - boolean skipStore + boolean skipStore, + int remapCnt, + boolean waitTopFut ) { this.rawRetval = rawRetval; @@ -207,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.waitTopFut = waitTopFut; if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); @@ -218,6 +227,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> !(cctx.writeThrough() && cctx.config().getInterceptor() != null); nearEnabled = CU.isNearEnabled(cctx); + + this.remapCnt = new AtomicInteger(remapCnt); } /** {@inheritDoc} */ @@ -295,10 +306,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * Performs future mapping. - * - * @param waitTopFut Whether to wait for topology future. */ - public void map(boolean waitTopFut) { + public void map() { AffinityTopologyVersion topVer = null; IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); @@ -310,14 +319,62 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); if (topVer == null) - mapOnTopology(null, false, null, waitTopFut); + mapOnTopology(null, false, null); else { topLocked = true; + // Cannot remap. + remapCnt.set(1); + map0(topVer, null, false, null); } } + /** + * @param failed Keys to remap. + */ + private void remap(Collection<?> failed) { + if (futVer != null) + cctx.mvcc().removeAtomicFuture(version()); + + Collection<Object> remapKeys = new ArrayList<>(failed.size()); + Collection<Object> remapVals = new ArrayList<>(failed.size()); + + Iterator<?> keyIt = keys.iterator(); + Iterator<?> valsIt = vals.iterator(); + + for (Object key : failed) { + while (keyIt.hasNext()) { + Object nextKey = keyIt.next(); + Object nextVal = valsIt.next(); + + if (F.eq(key, nextKey)) { + remapKeys.add(nextKey); + remapVals.add(nextVal); + + break; + } + } + } + + keys = remapKeys; + vals = remapVals; + + mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); + single = null; + futVer = null; + err = null; + opRes = null; + topVer = AffinityTopologyVersion.ZERO; + singleNodeId = null; + singleReq = null; + fastMapRemap = false; + updVer = null; + topLocked = false; + + map(); + } + /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { @@ -331,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); + if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) && remapCnt.decrementAndGet() > 0) { + remap(X.cause(err, CachePartialUpdateCheckedException.class).failedKeys()); + + return false; + } + if (super.onDone(retval, err)) { if (futVer != null) cctx.mvcc().removeAtomicFuture(version()); @@ -353,7 +416,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); - mapOnTopology(remapKeys, true, nodeId, true); + mapOnTopology(remapKeys, true, nodeId); return; } @@ -431,10 +494,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param keys Keys to map. * @param remap Boolean flag indicating if this is partial future remap. * @param oldNodeId Old node ID if remap. - * @param waitTopFut Whether to wait for topology future. */ - private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId, - final boolean waitTopFut) { + private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -465,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(keys, remap, oldNodeId, waitTopFut); + mapOnTopology(keys, remap, oldNodeId); } }); } @@ -509,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if (remap) - mapOnTopology(null, true, null, true); + mapOnTopology(null, true, null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java new file mode 100644 index 0000000..89d1040 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** + * @return Keys count for the test. + */ + protected abstract int keysCount(); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setBackups(1); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + int keysCnt = keysCount(); + + for (int i = 0; i < keysCnt; i++) + ignite(0).cache(null).put(i, i); + + finished.set(true); + fut.get(); + + for (int i = 0; i < keysCnt; i++) + assertEquals(i, ignite(0).cache(null).get(i)); + } + + /** + * @throws Exception If failed. + */ + public void testFailWithNoRetries() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + int keysCnt = keysCount(); + + boolean exceptionThrown = false; + + for (int i = 0; i < keysCnt; i++) { + try { + ignite(0).cache(null).withNoRetries().put(i, i); + } + catch (Exception e) { + assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class)); + + exceptionThrown = true; + + break; + } + } + + assertTrue(exceptionThrown); + + finished.set(true); + fut.get(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java new file mode 100644 index 0000000..e76663a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 60_000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java new file mode 100644 index 0000000..e65459a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 20_000; + } +}