Repository: incubator-ignite Updated Branches: refs/heads/ignite-621 3787a9d33 -> 5505b4d3d
IGNITE-621 - Added automatic retries for atomics. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5505b4d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5505b4d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5505b4d3 Branch: refs/heads/ignite-621 Commit: 5505b4d3d50caf41246d1194c52298a6df47a239 Parents: 3787a9d Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sun Jun 21 13:09:24 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sun Jun 21 13:09:24 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheUtils.java | 42 ++++++++++++++++++++ .../datastructures/GridCacheAtomicLongImpl.java | 25 ++++++------ .../GridCacheAtomicSequenceImpl.java | 11 ++--- .../GridCacheAtomicStampedImpl.java | 21 +++++----- .../GridCacheCountDownLatchImpl.java | 16 +++----- .../IgniteCachePutRetryAbstractSelfTest.java | 13 ++++++ ...gniteCachePutRetryTransactionalSelfTest.java | 39 ++++++++++++++++++ .../IgniteCacheFailoverTestSuite.java | 3 ++ 8 files changed, 131 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 8c26046..f88e288 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.T2; @@ -1699,4 +1700,45 @@ public class GridCacheUtils { ctx.resource().cleanupGeneric(lsnr); } } + + /** + * @param c Closure to retry. + * @param <S> Closure type. + * @return Wrapped closure. + */ + public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) { + return new Callable<S>() { + @Override public S call() throws Exception { + int retries = GridCacheAdapter.MAX_RETRIES; + + IgniteCheckedException err = null; + + for (int i = 0; i < retries; i++) { + try { + return c.call(); + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class) || + X.hasCause(e, IgniteTxRollbackCheckedException.class) || + X.hasCause(e, CachePartialUpdateCheckedException.class)) { + if (i < retries - 1) { + err = e; + + U.sleep(1); + + continue; + } + + throw e; + } + else + throw e; + } + } + + // Should never happen. + throw err; + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index b18d35a..5e9245d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; +import static org.apache.ignite.internal.util.typedef.internal.CU.*; /** * Cache atomic long implementation. @@ -78,7 +79,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext }; /** Callable for {@link #incrementAndGet()}. */ - private final Callable<Long> incAndGetCall = new Callable<Long>() { + private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -102,10 +103,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); /** Callable for {@link #getAndIncrement()}. */ - private final Callable<Long> getAndIncCall = new Callable<Long>() { + private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -129,10 +130,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); /** Callable for {@link #decrementAndGet()}. */ - private final Callable<Long> decAndGetCall = new Callable<Long>() { + private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -156,10 +157,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); /** Callable for {@link #getAndDecrement()}. */ - private final Callable<Long> getAndDecCall = new Callable<Long>() { + private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -183,7 +184,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); /** * Empty constructor required by {@link Externalizable}. @@ -378,7 +379,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext * @return Callable for execution in async and sync mode. */ private Callable<Long> internalAddAndGet(final long l) { - return new Callable<Long>() { + return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -402,7 +403,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); } /** @@ -412,7 +413,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext * @return Callable for execution in async and sync mode. */ private Callable<Long> internalGetAndAdd(final long l) { - return new Callable<Long>() { + return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); @@ -436,7 +437,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext throw e; } } - }; + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index e66c11e..2400a7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; +import static org.apache.ignite.internal.util.typedef.internal.CU.*; /** * Cache sequence implementation. @@ -435,11 +436,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc */ @SuppressWarnings("TooBroadScope") private Callable<Long> internalUpdate(final long l, final boolean updated) { - return new Callable<Long>() { + return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seq = seqView.get(key); checkRemoved(); @@ -506,11 +505,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc U.error(log, "Failed to get and add: " + this, e); throw e; - } finally { - tx.close(); } } - }; + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index a898e58..76ea7ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; +import static org.apache.ignite.internal.util.typedef.internal.CU.*; /** * Cache atomic stamped implementation. @@ -68,7 +69,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt private GridCacheContext ctx; /** Callable for {@link #get()} operation */ - private final Callable<IgniteBiTuple<T, S>> getCall = new Callable<IgniteBiTuple<T, S>>() { + private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() { @Override public IgniteBiTuple<T, S> call() throws Exception { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -77,10 +78,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt return stmp.get(); } - }; + }); /** Callable for {@link #value()} operation */ - private final Callable<T> valCall = new Callable<T>() { + private final Callable<T> valCall = retryTopologySafe(new Callable<T>() { @Override public T call() throws Exception { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -89,10 +90,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt return stmp.value(); } - }; + }); /** Callable for {@link #stamp()} operation */ - private final Callable<S> stampCall = new Callable<S>() { + private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() { @Override public S call() throws Exception { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -101,7 +102,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt return stmp.stamp(); } - }; + }); /** * Empty constructor required by {@link Externalizable}. @@ -254,7 +255,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt * @return Callable for execution in async and sync mode. */ private Callable<Boolean> internalSet(final T val, final S stamp) { - return new Callable<Boolean>() { + return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -276,7 +277,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt throw e; } } - }; + }); } /** @@ -292,7 +293,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred, final IgniteClosure<S, S> newStampClos) { - return new Callable<Boolean>() { + return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -323,7 +324,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt throw e; } } - }; + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 33547d9..ea7924f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; +import static org.apache.ignite.internal.util.typedef.internal.CU.*; /** * Cache count down latch implementation. @@ -179,12 +180,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public int countDown() { - try { - return CU.outTx(new CountDownCallable(1), ctx); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } + return countDown(1); } /** {@inheritDoc} */ @@ -192,7 +188,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc A.ensure(val > 0, "val should be positive"); try { - return CU.outTx(new CountDownCallable(val), ctx); + return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -202,7 +198,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc}*/ @Override public void countDownAll() { try { - CU.outTx(new CountDownCallable(0), ctx); + CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -248,7 +244,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc if (initGuard.compareAndSet(false, true)) { try { internalLatch = CU.outTx( - new Callable<CountDownLatch>() { + retryTopologySafe(new Callable<CountDownLatch>() { @Override public CountDownLatch call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue val = latchView.get(key); @@ -267,7 +263,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc return new CountDownLatch(val.get()); } } - }, + }), ctx ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 89d1040..bfddbe7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -52,6 +52,19 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr return cfg; } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + AtomicConfiguration acfg = new AtomicConfiguration(); + + acfg.setBackups(1); + + cfg.setAtomicConfiguration(acfg); + + return cfg; + } + /** * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index e65459a..91c454a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -17,7 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * @@ -32,4 +39,36 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr @Override protected int keysCount() { return 20_000; } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongRetries() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteAtomicLong atomic = ignite(0).atomicLong("TestAtomic", 0, true); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override + public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + int keysCnt = keysCount(); + + for (int i = 0; i < keysCnt; i++) + atomic.incrementAndGet(); + + finished.set(true); + fut.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index dda86c1..80bfbf2 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -72,6 +72,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTxNearDisabledPutGetRestartTest.class); suite.addTestSuite(IgniteCacheTxNearDisabledFairAffinityPutGetRestartTest.class); + suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class); + suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class); + return suite; } }