# Wait for next topology version before retry, retries for async tx operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/122a9dbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/122a9dbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/122a9dbf Branch: refs/heads/ignite-843 Commit: 122a9dbf337d5c1128be32d4efee1e0f1dc683f5 Parents: 47895da Author: sboikov <sboi...@gridgain.com> Authored: Fri Aug 14 12:50:06 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Aug 14 13:57:19 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 143 +++++++++++++++++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 25 +++- .../IgniteCachePutRetryAbstractSelfTest.java | 94 ++++++++++-- 3 files changed, 237 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 47ede5b..91af352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2283,8 +2283,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Filter. * @return Put future. */ - public IgniteInternalFuture<Boolean> putAsync(K key, V val, - @Nullable CacheEntryPredicate... filter) { + public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -3975,8 +3974,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V U.addLastCause(e, e1, log); } - if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) + if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer != null && topVer.topologyVersion() > 0 : tx; + + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); + continue; + } throw e; } @@ -4014,18 +4020,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); - if (tx == null || tx.implicit()) - tx = ctx.tm().newTx( - true, - op.single(), - ctx.systemTx() ? ctx : null, - OPTIMISTIC, - READ_COMMITTED, - ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), - !ctx.skipStore(), - 0); + if (tx == null || tx.implicit()) { + boolean skipStore = ctx.skipStore(); // Save value of thread-local flag. + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES; - return asyncOp(tx, op); + if (retries == 1) { + tx = ctx.tm().newTx( + true, + op.single(), + ctx.systemTx() ? ctx : null, + OPTIMISTIC, + READ_COMMITTED, + ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), + !skipStore, + 0); + + return asyncOp(tx, op); + } + else { + AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries); + + fut.execute(); + + return fut; + } + } + else + return asyncOp(tx, op); } /** @@ -4624,6 +4648,97 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * */ + private class AsyncOpRetryFuture<T> extends GridFutureAdapter<T> { + /** */ + private AsyncOp<T> op; + + /** */ + private boolean skipStore; + + /** */ + private int retries; + + /** */ + private IgniteTxLocalAdapter tx; + + /** + * @param op Operation. + * @param skipStore Skip store flag. + * @param retries Number of retries. + */ + public AsyncOpRetryFuture(AsyncOp<T> op, + boolean skipStore, + int retries) { + assert retries > 1 : retries; + + this.op = op; + this.tx = null; + this.skipStore = skipStore; + this.retries = retries; + } + + /** + * @param tx Transaction. + */ + public void execute() { + tx = ctx.tm().newTx( + true, + op.single(), + ctx.systemTx() ? ctx : null, + OPTIMISTIC, + READ_COMMITTED, + ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), + !skipStore, + 0); + + IgniteInternalFuture<T> fut = asyncOp(tx, op); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() { + @Override public void apply(IgniteInternalFuture<T> fut) { + try { + T res = fut.get(); + + onDone(res); + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) { + IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; + + assert tx != null; + + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer != null && topVer.topologyVersion() > 0 : tx; + + IgniteInternalFuture<?> topFut = + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); + + topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> topFut) { + try { + topFut.get(); + + execute(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + + return; + } + + onDone(e); + } + } + }); + } + } + + /** + * + */ private static class PeekModes { /** */ private boolean near; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 751c9ba..0498839 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -409,9 +409,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridFutureAdapter<Void> fut0; + long nextTopVer; + synchronized (this) { mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); + assert topVer != null && topVer.topologyVersion() > 0 : this; + + nextTopVer = topVer.topologyVersion() + 1; + topVer = AffinityTopologyVersion.ZERO; fut0 = topCompleteFut; @@ -428,7 +434,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updVer = null; topLocked = false; - map(); + IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(final IgniteInternalFuture<?> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + fut.get(); + + map(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index dcba325..9abc5c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -76,11 +77,25 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr protected CacheAtomicWriteOrderMode writeOrderMode() { return CLOCK; } - /** * @throws Exception If failed. */ public void testPut() throws Exception { + checkPut(false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAsync() throws Exception { + checkPut(true); + } + + /** + * @param async If {@code true} tests asynchronous put. + * @throws Exception If failed. + */ + private void checkPut(boolean async) throws Exception { final AtomicBoolean finished = new AtomicBoolean(); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @@ -104,20 +119,67 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr if (atomicityMode() == ATOMIC) assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); - for (int i = 0; i < keysCnt; i++) - cache.put(i, i); + int iter = 0; + + long stopTime = System.currentTimeMillis() + 60_000; + + if (async) { + IgniteCache<Object, Object> cache0 = cache.withAsync(); + + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) { + cache0.put(i, val); + + cache0.future().get(); + } + + for (int i = 0; i < keysCnt; i++) { + cache0.get(i); + + assertEquals(val, cache0.future().get()); + } + } + } + else { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) + cache.put(i, val); + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + } finished.set(true); fut.get(); for (int i = 0; i < keysCnt; i++) - assertEquals(i, ignite(0).cache(null).get(i)); + assertEquals(iter, cache.get(i)); + } + + /** + * @throws Exception If failed. + */ + public void testFailsWithNoRetries() throws Exception { + checkFailsWithNoRetries(false); + } + + /** + * @throws Exception If failed. + */ + public void testFailsWithNoRetriesAsync() throws Exception { + checkFailsWithNoRetries(true); } /** + * @param async If {@code true} tests asynchronous put. * @throws Exception If failed. */ - public void testFailWithNoRetries() throws Exception { + private void checkFailsWithNoRetries(boolean async) throws Exception { final AtomicBoolean finished = new AtomicBoolean(); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @@ -136,22 +198,34 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr int keysCnt = keysCount(); - boolean exceptionThrown = false; + boolean eThrown = false; + + IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries(); + + if (async) + cache = cache.withAsync(); for (int i = 0; i < keysCnt; i++) { try { - ignite(0).cache(null).withNoRetries().put(i, i); + if (async) { + cache.put(i, i); + + cache.future().get(); + } + else + cache.put(i, i); } catch (Exception e) { - assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class)); + assertTrue("Invalid exception: " + e, + X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class)); - exceptionThrown = true; + eThrown = true; break; } } - assertTrue(exceptionThrown); + assertTrue(eThrown); finished.set(true); fut.get();